Skip to content

Instantly share code, notes, and snippets.

@aronchick
Created May 7, 2022 00:40
Show Gist options
  • Save aronchick/25d6fca71df0ef86846c40bec5cbc2c3 to your computer and use it in GitHub Desktop.
Save aronchick/25d6fca71df0ef86846c40bec5cbc2c3 to your computer and use it in GitHub Desktop.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: compilation-of-pipelines-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12, pipelines.kubeflow.org/pipeline_compilation_time: '2022-05-06T23:57:02.426196',
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "", "name": "context",
"optional": true}, {"default": "", "name": "metadata_url", "optional": true},
{"default": "minio", "name": "AWS_ACCESS_KEY_ID", "optional": true, "type":
"String"}, {"default": "minio123", "name": "AWS_SECRET_ACCESS_KEY", "optional":
true, "type": "String"}, {"default": "http://combinator-minio.mlflow.svc.cluster.local:9000",
"name": "MLFLOW_S3_ENDPOINT_URL", "optional": true, "type": "String"}, {"default":
"http://combinator-mlflow.mlflow.svc.cluster.local:5000", "name": "MLFLOW_TRACKING_URI",
"optional": true, "type": "String"}], "name": "Compilation of pipelines"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12}
spec:
entrypoint: compilation-of-pipelines
templates:
- name: compilation-of-pipelines
inputs:
parameters:
- {name: AWS_ACCESS_KEY_ID}
- {name: AWS_SECRET_ACCESS_KEY}
- {name: MLFLOW_S3_ENDPOINT_URL}
- {name: MLFLOW_TRACKING_URI}
- {name: metadata_url}
dag:
tasks:
- {name: run-info-fn, template: run-info-fn}
- name: same-step-000-a29af0569ab04006b678357f767987db-fn
template: same-step-000-a29af0569ab04006b678357f767987db-fn
dependencies: [run-info-fn]
arguments:
parameters:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
- {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'}
- {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'}
- name: same-step-001-60ad85fb69284d98992da58abd28344c-fn
template: same-step-001-60ad85fb69284d98992da58abd28344c-fn
dependencies: [run-info-fn, same-step-000-a29af0569ab04006b678357f767987db-fn]
arguments:
parameters:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
- {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'}
- {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'}
artifacts:
- {name: same-step-000-a29af0569ab04006b678357f767987db-fn-output_context,
from: '{{tasks.same-step-000-a29af0569ab04006b678357f767987db-fn.outputs.artifacts.same-step-000-a29af0569ab04006b678357f767987db-fn-output_context}}'}
- name: same-step-002-b7dbb05511774bd7bab385301d43b388-fn
template: same-step-002-b7dbb05511774bd7bab385301d43b388-fn
dependencies: [run-info-fn, same-step-001-60ad85fb69284d98992da58abd28344c-fn]
arguments:
parameters:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
- {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'}
- {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'}
artifacts:
- {name: same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context,
from: '{{tasks.same-step-001-60ad85fb69284d98992da58abd28344c-fn.outputs.artifacts.same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context}}'}
- name: run-info-fn
container:
args: [--run-id, '{{workflow.uid}}', '----output-paths', /tmp/outputs/run_info/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'kfp' 'dill' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp' 'dill' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def run_info_fn(
run_id,
):
from base64 import urlsafe_b64encode
from collections import namedtuple
import datetime
import base64
import dill
import kfp
client = kfp.Client(host="http://ml-pipeline:8888")
run_info = client.get_run(run_id=run_id)
run_info_dict = {
"run_id": run_info.run.id,
"name": run_info.run.name,
"created_at": run_info.run.created_at.isoformat(),
"pipeline_id": run_info.run.pipeline_spec.pipeline_id,
}
# Track kubernetes resources associated wth the run.
for r in run_info.run.resource_references:
run_info_dict[f"{r.key.type.lower()}_id"] = r.key.id
# Base64-encoded as value is visible in kubeflow ui.
output = urlsafe_b64encode(dill.dumps(run_info_dict))
return namedtuple("RunInfoOutput", ["run_info"])(
str(output, encoding="ascii")
)
def _serialize_str(str_value: str) -> str:
if not isinstance(str_value, str):
raise TypeError('Value "{}" has type "{}" instead of str.'.format(
str(str_value), str(type(str_value))))
return str_value
import argparse
_parser = argparse.ArgumentParser(prog='Run info fn', description='')
_parser.add_argument("--run-id", dest="run_id", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = run_info_fn(**_parsed_args)
_output_serializers = [
_serialize_str,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
image: python:3.7
outputs:
parameters:
- name: run-info-fn-run_info
valueFrom: {path: /tmp/outputs/run_info/data}
artifacts:
- {name: run-info-fn-run_info, path: /tmp/outputs/run_info/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--run-id", {"inputValue": "run_id"}, "----output-paths", {"outputPath":
"run_info"}], "command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''kfp'' ''dill''
|| PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
''kfp'' ''dill'' --user) && \"$0\" \"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def run_info_fn(\n run_id,\n):\n from base64 import urlsafe_b64encode\n from
collections import namedtuple\n import datetime\n import base64\n import
dill\n import kfp\n\n client = kfp.Client(host=\"http://ml-pipeline:8888\")\n run_info
= client.get_run(run_id=run_id)\n\n run_info_dict = {\n \"run_id\":
run_info.run.id,\n \"name\": run_info.run.name,\n \"created_at\":
run_info.run.created_at.isoformat(),\n \"pipeline_id\": run_info.run.pipeline_spec.pipeline_id,\n }\n\n #
Track kubernetes resources associated wth the run.\n for r in run_info.run.resource_references:\n run_info_dict[f\"{r.key.type.lower()}_id\"]
= r.key.id\n\n # Base64-encoded as value is visible in kubeflow ui.\n output
= urlsafe_b64encode(dill.dumps(run_info_dict))\n\n return namedtuple(\"RunInfoOutput\",
[\"run_info\"])(\n str(output, encoding=\"ascii\")\n )\n\ndef
_serialize_str(str_value: str) -> str:\n if not isinstance(str_value,
str):\n raise TypeError(''Value \"{}\" has type \"{}\" instead of
str.''.format(\n str(str_value), str(type(str_value))))\n return
str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Run
info fn'', description='''')\n_parser.add_argument(\"--run-id\", dest=\"run_id\",
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = run_info_fn(**_parsed_args)\n\n_output_serializers
= [\n _serialize_str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "python:3.7"}}, "inputs": [{"name": "run_id", "type": "String"}],
"name": "Run info fn", "outputs": [{"name": "run_info", "type": "String"}]}',
pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/arguments.parameters: '{"run_id":
"{{workflow.uid}}"}'}
- name: same-step-000-a29af0569ab04006b678357f767987db-fn
container:
args: [--input-context, /tmp/inputs/input_context/data, --run-info, '{{inputs.parameters.run-info-fn-run_info}}',
--metadata-url, '{{inputs.parameters.metadata_url}}', --output-context, /tmp/outputs/output_context/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'dill' 'requests' 'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy'
'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests'
'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly'
'Requests' 'scipy' 'tensorflow' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def same_step_000_a29af0569ab04006b678357f767987db_fn(
input_context_path,
output_context_path,
run_info="gAR9lC4=",
metadata_url="",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_000",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
from base64 import urlsafe_b64decode
from pympler import asizeof
from tempfile import mktemp
import dill
import os
# Install dependencies from requirements file.
_req_file = mktemp()
with open(_req_file, "w") as _file:
_file.write(urlsafe_b64decode("bnVtcHkgPT0gMS4yMS4yCg==").decode())
os.system("pip install -r " + _req_file)
# Load the exploding variables module:
{urlsafe_b64decode("import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
").decode()}
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
# Run the user's notebook code:
{urlsafe_b64decode("ZGF0YXNldCA9ICdzYW1wbGVfZGF0YScKZ3B1X3R5cGUgPSAnQTEwMCcKaW1wb3J0IHRlbnNvcmZsb3cKaW1wb3J0IGRhdGV0aW1lCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IDEwCmIgPSBhICsgNSAjMTUKZnJvbSBJUHl0aG9uLmRpc3BsYXkgaW1wb3J0IEltYWdlCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy9GYXJvZUlzbGFuZHMuanBlZycKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjMjUKCmZyb20gSVB5dGhvbiBpbXBvcnQgZGlzcGxheQpkaXNwbGF5LkltYWdlKHVybCkKCmltcG9ydCBwbG90bHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgpkZWYgc29tZV9tYXRoKHgsIHopIC0-IHR1cGxlOgogICAgcmV0dXJuIChyb3VuZCh4ICsgeiwgMiksIHJvdW5kKHggLyB6LCAyKSkKCmEgPSBhICogMjAKYiA9IGIgKiAxMDAgIzI1MDAKCnByaW50KGYiQiA9IHtifSIp").decode()}
# Various types of serialisation failures we'd like to inform the user of:
def _cannot_pickle_msg(k):
return "Variable '" + k +"' was defined in previous steps, but could not be serialised as it was not supported by 'dill'."
def _too_large_msg(k, mem):
return "Variable '" + k + "' was defined in previous steps, but could not be serialised as it used too much memory (" + str(mem) + "B)."
# Replace unserialisable variables with exploding variables so that the user
# knows why they cannot use them in future steps:
_all_keys = list(globals().keys())
for _k in _all_keys:
if not dill.pickles(globals()[_k], safe=True):
globals()[_k] = ExplodingVariable(Exception(_cannot_pickle_msg(_k)))
continue
_mem = asizeof.asizeof(globals()[_k])
if _mem >= 52428800:
globals()[_k] = ExplodingVariable(Exception(_too_large_msg(_k, _mem)))
continue
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_000",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
import argparse
_parser = argparse.ArgumentParser(prog='Same step 000 a29af0569ab04006b678357f767987db fn', description='')
_parser.add_argument("--input-context", dest="input_context_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--run-info", dest="run_info", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--metadata-url", dest="metadata_url", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--output-context", dest="output_context_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = same_step_000_a29af0569ab04006b678357f767987db_fn(**_parsed_args)
env:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
image: library/python:3.9-slim-buster
inputs:
parameters:
- {name: AWS_ACCESS_KEY_ID}
- {name: AWS_SECRET_ACCESS_KEY}
- {name: MLFLOW_S3_ENDPOINT_URL}
- {name: MLFLOW_TRACKING_URI}
- {name: metadata_url}
- {name: run-info-fn-run_info}
artifacts:
- name: input_context
path: /tmp/inputs/input_context/data
raw: {data: ''}
outputs:
artifacts:
- {name: same-step-000-a29af0569ab04006b678357f767987db-fn-output_context, path: /tmp/outputs/output_context/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--input-context", {"inputPath": "input_context"}, {"if": {"cond":
{"isPresent": "run_info"}, "then": ["--run-info", {"inputValue": "run_info"}]}},
{"if": {"cond": {"isPresent": "metadata_url"}, "then": ["--metadata-url",
{"inputValue": "metadata_url"}]}}, "--output-context", {"outputPath": "output_context"}],
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip
install --quiet --no-warn-script-location ''dill'' ''requests'' ''pympler==1.0.1''
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly''
''Requests'' ''scipy'' ''tensorflow'' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests''
''pympler==1.0.1'' ''chart_studio'' ''ipython'' ''matplotlib'' ''numpy''
''pandas'' ''plotly'' ''Requests'' ''scipy'' ''tensorflow'' --user) && \"$0\"
\"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path:
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return
file_path\n\ndef same_step_000_a29af0569ab04006b678357f767987db_fn(\n input_context_path,\n output_context_path,\n run_info=\"gAR9lC4=\",\n metadata_url=\"\",\n):\n from
base64 import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import
Path\n import datetime\n import requests\n import tempfile\n import
dill\n import os\n\n input_context = None\n with Path(input_context_path).open(\"rb\")
as reader:\n input_context = reader.read()\n\n # Helper function
for posting metadata to mlflow.\n def post_metadata(json):\n if
metadata_url == \"\":\n return\n\n try:\n req
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except
requests.exceptions.HTTPError as err:\n print(f\"Error posting
metadata: {err}\")\n\n # Move to writable directory as user might want
to do file IO.\n # TODO: won''t persist across steps, might need support
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n #
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_000\",\n \"metadata_type\": \"input\",\n \"metadata_value\":
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n #
User code for step, which we run in its own execution frame.\n user_code
= f\"\"\"\nfrom base64 import urlsafe_b64decode\nfrom pympler import asizeof\nfrom
tempfile import mktemp\nimport dill\nimport os\n\n# Install dependencies
from requirements file.\n_req_file = mktemp()\nwith open(_req_file, \"w\")
as _file:\n _file.write(urlsafe_b64decode(\"bnVtcHkgPT0gMS4yMS4yCg==\").decode())\nos.system(\"pip
install -r \" + _req_file)\n\n# Load the exploding variables module:\n{urlsafe_b64decode(\"import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
\").decode()}\n\n#
Load session context into global namespace:\nif { len(input_context) } >
0:\n dill.load_session(\"{ input_context_path }\")\n\n# Run the user''s
notebook code:\n{urlsafe_b64decode(\"ZGF0YXNldCA9ICdzYW1wbGVfZGF0YScKZ3B1X3R5cGUgPSAnQTEwMCcKaW1wb3J0IHRlbnNvcmZsb3cKaW1wb3J0IGRhdGV0aW1lCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IDEwCmIgPSBhICsgNSAjMTUKZnJvbSBJUHl0aG9uLmRpc3BsYXkgaW1wb3J0IEltYWdlCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy9GYXJvZUlzbGFuZHMuanBlZycKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjMjUKCmZyb20gSVB5dGhvbiBpbXBvcnQgZGlzcGxheQpkaXNwbGF5LkltYWdlKHVybCkKCmltcG9ydCBwbG90bHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgpkZWYgc29tZV9tYXRoKHgsIHopIC0-IHR1cGxlOgogICAgcmV0dXJuIChyb3VuZCh4ICsgeiwgMiksIHJvdW5kKHggLyB6LCAyKSkKCmEgPSBhICogMjAKYiA9IGIgKiAxMDAgIzI1MDAKCnByaW50KGYiQiA9IHtifSIp\").decode()}\n\n#
Various types of serialisation failures we''d like to inform the user of:\ndef
_cannot_pickle_msg(k):\n return \"Variable ''\" + k +\"'' was defined
in previous steps, but could not be serialised as it was not supported by
''dill''.\"\n\ndef _too_large_msg(k, mem):\n return \"Variable ''\" +
k + \"'' was defined in previous steps, but could not be serialised as it
used too much memory (\" + str(mem) + \"B).\"\n\n# Replace unserialisable
variables with exploding variables so that the user\n# knows why they cannot
use them in future steps:\n_all_keys = list(globals().keys())\nfor _k in
_all_keys:\n if not dill.pickles(globals()[_k], safe=True):\n globals()[_k]
= ExplodingVariable(Exception(_cannot_pickle_msg(_k)))\n continue\n\n _mem
= asizeof.asizeof(globals()[_k])\n if _mem >= 52428800:\n globals()[_k]
= ExplodingVariable(Exception(_too_large_msg(_k, _mem)))\n continue\n\n#
Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n #
Runs the user code in a new execution frame. Context from the previous\n #
component in the run is loaded into the session dynamically, and we run\n #
with a single globals() namespace to simulate top-level execution.\n exec(user_code,
globals(), globals())\n\n # Post new session context to mlflow:\n with
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_000\",\n \"metadata_type\": \"output\",\n \"metadata_value\":
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\nimport
argparse\n_parser = argparse.ArgumentParser(prog=''Same step 000 a29af0569ab04006b678357f767987db
fn'', description='''')\n_parser.add_argument(\"--input-context\", dest=\"input_context_path\",
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--run-info\",
dest=\"run_info\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--metadata-url\",
dest=\"metadata_url\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-context\",
dest=\"output_context_path\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
= same_step_000_a29af0569ab04006b678357f767987db_fn(**_parsed_args)\n"],
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context",
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional":
true}, {"default": "", "name": "metadata_url", "optional": true}], "name":
"Same step 000 a29af0569ab04006b678357f767987db fn", "outputs": [{"name":
"output_context", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}',
pipelines.kubeflow.org/arguments.parameters: '{"metadata_url": "{{inputs.parameters.metadata_url}}",
"run_info": "{{inputs.parameters.run-info-fn-run_info}}"}', pipelines.kubeflow.org/max_cache_staleness: P0D}
- name: same-step-001-60ad85fb69284d98992da58abd28344c-fn
container:
args: [--input-context, /tmp/inputs/input_context/data, --run-info, '{{inputs.parameters.run-info-fn-run_info}}',
--metadata-url, '{{inputs.parameters.metadata_url}}', --output-context, /tmp/outputs/output_context/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'dill' 'requests' 'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy'
'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests'
'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly'
'Requests' 'scipy' 'tensorflow' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def same_step_001_60ad85fb69284d98992da58abd28344c_fn(
input_context_path,
output_context_path,
run_info="gAR9lC4=",
metadata_url="",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_001",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
from base64 import urlsafe_b64decode
from pympler import asizeof
from tempfile import mktemp
import dill
import os
# Install dependencies from requirements file.
_req_file = mktemp()
with open(_req_file, "w") as _file:
_file.write(urlsafe_b64decode("bnVtcHkgPT0gMS4yMS4yCg==").decode())
os.system("pip install -r " + _req_file)
# Load the exploding variables module:
{urlsafe_b64decode("import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
").decode()}
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
# Run the user's notebook code:
{urlsafe_b64decode("aW1wb3J0IG51bXB5IGFzIG5wCmltcG9ydCBtYXRwbG90bGliLnB5cGxvdCBhcyBwbHQKaW1wb3J0IHNjaXB5LnN0YXRzIGFzIHN0YXRzCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKbXUgPSAwCnN0ZCA9IDEKCnggPSBucC5saW5zcGFjZShzdGFydD0tNCwgc3RvcD00LCBudW09MTAwKQp5ID0gc3RhdHMubm9ybS5wZGYoeCwgbXUsIHN0ZCkKCmEgPSBhICsgNQpiID0gYiArIDEwICMgMjUxNSAKCnBsdC5wbG90KHgsIHkpCnBsdC5zaG93KCkKaW1wb3J0IHJlcXVlc3RzCmltcG9ydCBwYW5kYXMgYXMgcGQKaW1wb3J0IHBsb3RseS5maWd1cmVfZmFjdG9yeSBhcyBmZgppbXBvcnQgY2hhcnRfc3R1ZGlvLnBsb3RseSBhcyBweQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCnVybCA9ICdodHRwczovL3Jhdy5naXRodWJ1c2VyY29udGVudC5jb20vU0FNRS1Qcm9qZWN0L1NBTUUtc2FtcGxlcy9tYWluL3Rlc3QtYXJ0aWZhY3RzL3Rlc3QuY3N2JwpkZiA9IHBkLnJlYWRfY3N2KHVybCkKCmEgPSBhICogMTAwMApiID0gYiAvIDY3ICMgMzcuNTM3MzEzNDMyOAoKZGYuZGVzY3JpYmUoKQ==").decode()}
# Various types of serialisation failures we'd like to inform the user of:
def _cannot_pickle_msg(k):
return "Variable '" + k +"' was defined in previous steps, but could not be serialised as it was not supported by 'dill'."
def _too_large_msg(k, mem):
return "Variable '" + k + "' was defined in previous steps, but could not be serialised as it used too much memory (" + str(mem) + "B)."
# Replace unserialisable variables with exploding variables so that the user
# knows why they cannot use them in future steps:
_all_keys = list(globals().keys())
for _k in _all_keys:
if not dill.pickles(globals()[_k], safe=True):
globals()[_k] = ExplodingVariable(Exception(_cannot_pickle_msg(_k)))
continue
_mem = asizeof.asizeof(globals()[_k])
if _mem >= 52428800:
globals()[_k] = ExplodingVariable(Exception(_too_large_msg(_k, _mem)))
continue
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_001",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
import argparse
_parser = argparse.ArgumentParser(prog='Same step 001 60ad85fb69284d98992da58abd28344c fn', description='')
_parser.add_argument("--input-context", dest="input_context_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--run-info", dest="run_info", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--metadata-url", dest="metadata_url", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--output-context", dest="output_context_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = same_step_001_60ad85fb69284d98992da58abd28344c_fn(**_parsed_args)
env:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
image: library/python:3.9-slim-buster
inputs:
parameters:
- {name: AWS_ACCESS_KEY_ID}
- {name: AWS_SECRET_ACCESS_KEY}
- {name: MLFLOW_S3_ENDPOINT_URL}
- {name: MLFLOW_TRACKING_URI}
- {name: metadata_url}
- {name: run-info-fn-run_info}
artifacts:
- {name: same-step-000-a29af0569ab04006b678357f767987db-fn-output_context, path: /tmp/inputs/input_context/data}
outputs:
artifacts:
- {name: same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context, path: /tmp/outputs/output_context/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--input-context", {"inputPath": "input_context"}, {"if": {"cond":
{"isPresent": "run_info"}, "then": ["--run-info", {"inputValue": "run_info"}]}},
{"if": {"cond": {"isPresent": "metadata_url"}, "then": ["--metadata-url",
{"inputValue": "metadata_url"}]}}, "--output-context", {"outputPath": "output_context"}],
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip
install --quiet --no-warn-script-location ''dill'' ''requests'' ''pympler==1.0.1''
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly''
''Requests'' ''scipy'' ''tensorflow'' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests''
''pympler==1.0.1'' ''chart_studio'' ''ipython'' ''matplotlib'' ''numpy''
''pandas'' ''plotly'' ''Requests'' ''scipy'' ''tensorflow'' --user) && \"$0\"
\"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path:
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return
file_path\n\ndef same_step_001_60ad85fb69284d98992da58abd28344c_fn(\n input_context_path,\n output_context_path,\n run_info=\"gAR9lC4=\",\n metadata_url=\"\",\n):\n from
base64 import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import
Path\n import datetime\n import requests\n import tempfile\n import
dill\n import os\n\n input_context = None\n with Path(input_context_path).open(\"rb\")
as reader:\n input_context = reader.read()\n\n # Helper function
for posting metadata to mlflow.\n def post_metadata(json):\n if
metadata_url == \"\":\n return\n\n try:\n req
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except
requests.exceptions.HTTPError as err:\n print(f\"Error posting
metadata: {err}\")\n\n # Move to writable directory as user might want
to do file IO.\n # TODO: won''t persist across steps, might need support
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n #
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_001\",\n \"metadata_type\": \"input\",\n \"metadata_value\":
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n #
User code for step, which we run in its own execution frame.\n user_code
= f\"\"\"\nfrom base64 import urlsafe_b64decode\nfrom pympler import asizeof\nfrom
tempfile import mktemp\nimport dill\nimport os\n\n# Install dependencies
from requirements file.\n_req_file = mktemp()\nwith open(_req_file, \"w\")
as _file:\n _file.write(urlsafe_b64decode(\"bnVtcHkgPT0gMS4yMS4yCg==\").decode())\nos.system(\"pip
install -r \" + _req_file)\n\n# Load the exploding variables module:\n{urlsafe_b64decode(\"import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
\").decode()}\n\n#
Load session context into global namespace:\nif { len(input_context) } >
0:\n dill.load_session(\"{ input_context_path }\")\n\n# Run the user''s
notebook code:\n{urlsafe_b64decode(\"aW1wb3J0IG51bXB5IGFzIG5wCmltcG9ydCBtYXRwbG90bGliLnB5cGxvdCBhcyBwbHQKaW1wb3J0IHNjaXB5LnN0YXRzIGFzIHN0YXRzCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKbXUgPSAwCnN0ZCA9IDEKCnggPSBucC5saW5zcGFjZShzdGFydD0tNCwgc3RvcD00LCBudW09MTAwKQp5ID0gc3RhdHMubm9ybS5wZGYoeCwgbXUsIHN0ZCkKCmEgPSBhICsgNQpiID0gYiArIDEwICMgMjUxNSAKCnBsdC5wbG90KHgsIHkpCnBsdC5zaG93KCkKaW1wb3J0IHJlcXVlc3RzCmltcG9ydCBwYW5kYXMgYXMgcGQKaW1wb3J0IHBsb3RseS5maWd1cmVfZmFjdG9yeSBhcyBmZgppbXBvcnQgY2hhcnRfc3R1ZGlvLnBsb3RseSBhcyBweQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCnVybCA9ICdodHRwczovL3Jhdy5naXRodWJ1c2VyY29udGVudC5jb20vU0FNRS1Qcm9qZWN0L1NBTUUtc2FtcGxlcy9tYWluL3Rlc3QtYXJ0aWZhY3RzL3Rlc3QuY3N2JwpkZiA9IHBkLnJlYWRfY3N2KHVybCkKCmEgPSBhICogMTAwMApiID0gYiAvIDY3ICMgMzcuNTM3MzEzNDMyOAoKZGYuZGVzY3JpYmUoKQ==\").decode()}\n\n#
Various types of serialisation failures we''d like to inform the user of:\ndef
_cannot_pickle_msg(k):\n return \"Variable ''\" + k +\"'' was defined
in previous steps, but could not be serialised as it was not supported by
''dill''.\"\n\ndef _too_large_msg(k, mem):\n return \"Variable ''\" +
k + \"'' was defined in previous steps, but could not be serialised as it
used too much memory (\" + str(mem) + \"B).\"\n\n# Replace unserialisable
variables with exploding variables so that the user\n# knows why they cannot
use them in future steps:\n_all_keys = list(globals().keys())\nfor _k in
_all_keys:\n if not dill.pickles(globals()[_k], safe=True):\n globals()[_k]
= ExplodingVariable(Exception(_cannot_pickle_msg(_k)))\n continue\n\n _mem
= asizeof.asizeof(globals()[_k])\n if _mem >= 52428800:\n globals()[_k]
= ExplodingVariable(Exception(_too_large_msg(_k, _mem)))\n continue\n\n#
Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n #
Runs the user code in a new execution frame. Context from the previous\n #
component in the run is loaded into the session dynamically, and we run\n #
with a single globals() namespace to simulate top-level execution.\n exec(user_code,
globals(), globals())\n\n # Post new session context to mlflow:\n with
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_001\",\n \"metadata_type\": \"output\",\n \"metadata_value\":
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\nimport
argparse\n_parser = argparse.ArgumentParser(prog=''Same step 001 60ad85fb69284d98992da58abd28344c
fn'', description='''')\n_parser.add_argument(\"--input-context\", dest=\"input_context_path\",
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--run-info\",
dest=\"run_info\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--metadata-url\",
dest=\"metadata_url\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-context\",
dest=\"output_context_path\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
= same_step_001_60ad85fb69284d98992da58abd28344c_fn(**_parsed_args)\n"],
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context",
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional":
true}, {"default": "", "name": "metadata_url", "optional": true}], "name":
"Same step 001 60ad85fb69284d98992da58abd28344c fn", "outputs": [{"name":
"output_context", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}',
pipelines.kubeflow.org/arguments.parameters: '{"metadata_url": "{{inputs.parameters.metadata_url}}",
"run_info": "{{inputs.parameters.run-info-fn-run_info}}"}', pipelines.kubeflow.org/max_cache_staleness: P0D}
- name: same-step-002-b7dbb05511774bd7bab385301d43b388-fn
container:
args: [--input-context, /tmp/inputs/input_context/data, --run-info, '{{inputs.parameters.run-info-fn-run_info}}',
--metadata-url, '{{inputs.parameters.metadata_url}}', --output-context, /tmp/outputs/output_context/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'dill' 'requests' 'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy'
'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests'
'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly'
'Requests' 'scipy' 'tensorflow' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def same_step_002_b7dbb05511774bd7bab385301d43b388_fn(
input_context_path,
output_context_path,
run_info="gAR9lC4=",
metadata_url="",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_002",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
from base64 import urlsafe_b64decode
from pympler import asizeof
from tempfile import mktemp
import dill
import os
# Install dependencies from requirements file.
_req_file = mktemp()
with open(_req_file, "w") as _file:
_file.write(urlsafe_b64decode("bnVtcHkgPT0gMS4yMS4yCg==").decode())
os.system("pip install -r " + _req_file)
# Load the exploding variables module:
{urlsafe_b64decode("import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
").decode()}
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
# Run the user's notebook code:
{urlsafe_b64decode("YSA9IGEgKyA1CmIgPSBiICsgMTAgIyA0Ny41MzczMTM0MzI4CnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmcgPSBzb21lX21hdGgoOCwgMjEpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmogPSBnWzBdCmsgPSBnWzFdCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA1Ny41MzczMTM0MzI4CgpwcmludChmImo6IHtqfSIpCnByaW50KGYiazoge2t9IikKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDY3LjUzNzMxMzQzMjgKcHJpbnQoIjAuMC4yIikKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKcHJpbnQoZiJBY2Nlc3NpbmcgdGhlIHZhbHVlIG9mIEI6IHtifSIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCg==").decode()}
# Various types of serialisation failures we'd like to inform the user of:
def _cannot_pickle_msg(k):
return "Variable '" + k +"' was defined in previous steps, but could not be serialised as it was not supported by 'dill'."
def _too_large_msg(k, mem):
return "Variable '" + k + "' was defined in previous steps, but could not be serialised as it used too much memory (" + str(mem) + "B)."
# Replace unserialisable variables with exploding variables so that the user
# knows why they cannot use them in future steps:
_all_keys = list(globals().keys())
for _k in _all_keys:
if not dill.pickles(globals()[_k], safe=True):
globals()[_k] = ExplodingVariable(Exception(_cannot_pickle_msg(_k)))
continue
_mem = asizeof.asizeof(globals()[_k])
if _mem >= 52428800:
globals()[_k] = ExplodingVariable(Exception(_too_large_msg(_k, _mem)))
continue
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_002",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
import argparse
_parser = argparse.ArgumentParser(prog='Same step 002 b7dbb05511774bd7bab385301d43b388 fn', description='')
_parser.add_argument("--input-context", dest="input_context_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--run-info", dest="run_info", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--metadata-url", dest="metadata_url", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--output-context", dest="output_context_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = same_step_002_b7dbb05511774bd7bab385301d43b388_fn(**_parsed_args)
env:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
image: library/python:3.9-slim-buster
inputs:
parameters:
- {name: AWS_ACCESS_KEY_ID}
- {name: AWS_SECRET_ACCESS_KEY}
- {name: MLFLOW_S3_ENDPOINT_URL}
- {name: MLFLOW_TRACKING_URI}
- {name: metadata_url}
- {name: run-info-fn-run_info}
artifacts:
- {name: same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context, path: /tmp/inputs/input_context/data}
outputs:
artifacts:
- {name: same-step-002-b7dbb05511774bd7bab385301d43b388-fn-output_context, path: /tmp/outputs/output_context/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--input-context", {"inputPath": "input_context"}, {"if": {"cond":
{"isPresent": "run_info"}, "then": ["--run-info", {"inputValue": "run_info"}]}},
{"if": {"cond": {"isPresent": "metadata_url"}, "then": ["--metadata-url",
{"inputValue": "metadata_url"}]}}, "--output-context", {"outputPath": "output_context"}],
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip
install --quiet --no-warn-script-location ''dill'' ''requests'' ''pympler==1.0.1''
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly''
''Requests'' ''scipy'' ''tensorflow'' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests''
''pympler==1.0.1'' ''chart_studio'' ''ipython'' ''matplotlib'' ''numpy''
''pandas'' ''plotly'' ''Requests'' ''scipy'' ''tensorflow'' --user) && \"$0\"
\"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path:
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return
file_path\n\ndef same_step_002_b7dbb05511774bd7bab385301d43b388_fn(\n input_context_path,\n output_context_path,\n run_info=\"gAR9lC4=\",\n metadata_url=\"\",\n):\n from
base64 import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import
Path\n import datetime\n import requests\n import tempfile\n import
dill\n import os\n\n input_context = None\n with Path(input_context_path).open(\"rb\")
as reader:\n input_context = reader.read()\n\n # Helper function
for posting metadata to mlflow.\n def post_metadata(json):\n if
metadata_url == \"\":\n return\n\n try:\n req
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except
requests.exceptions.HTTPError as err:\n print(f\"Error posting
metadata: {err}\")\n\n # Move to writable directory as user might want
to do file IO.\n # TODO: won''t persist across steps, might need support
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n #
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_002\",\n \"metadata_type\": \"input\",\n \"metadata_value\":
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n #
User code for step, which we run in its own execution frame.\n user_code
= f\"\"\"\nfrom base64 import urlsafe_b64decode\nfrom pympler import asizeof\nfrom
tempfile import mktemp\nimport dill\nimport os\n\n# Install dependencies
from requirements file.\n_req_file = mktemp()\nwith open(_req_file, \"w\")
as _file:\n _file.write(urlsafe_b64decode(\"bnVtcHkgPT0gMS4yMS4yCg==\").decode())\nos.system(\"pip
install -r \" + _req_file)\n\n# Load the exploding variables module:\n{urlsafe_b64decode(\"import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
\").decode()}\n\n#
Load session context into global namespace:\nif { len(input_context) } >
0:\n dill.load_session(\"{ input_context_path }\")\n\n# Run the user''s
notebook code:\n{urlsafe_b64decode(\"YSA9IGEgKyA1CmIgPSBiICsgMTAgIyA0Ny41MzczMTM0MzI4CnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmcgPSBzb21lX21hdGgoOCwgMjEpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmogPSBnWzBdCmsgPSBnWzFdCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA1Ny41MzczMTM0MzI4CgpwcmludChmImo6IHtqfSIpCnByaW50KGYiazoge2t9IikKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDY3LjUzNzMxMzQzMjgKcHJpbnQoIjAuMC4yIikKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKcHJpbnQoZiJBY2Nlc3NpbmcgdGhlIHZhbHVlIG9mIEI6IHtifSIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCg==\").decode()}\n\n#
Various types of serialisation failures we''d like to inform the user of:\ndef
_cannot_pickle_msg(k):\n return \"Variable ''\" + k +\"'' was defined
in previous steps, but could not be serialised as it was not supported by
''dill''.\"\n\ndef _too_large_msg(k, mem):\n return \"Variable ''\" +
k + \"'' was defined in previous steps, but could not be serialised as it
used too much memory (\" + str(mem) + \"B).\"\n\n# Replace unserialisable
variables with exploding variables so that the user\n# knows why they cannot
use them in future steps:\n_all_keys = list(globals().keys())\nfor _k in
_all_keys:\n if not dill.pickles(globals()[_k], safe=True):\n globals()[_k]
= ExplodingVariable(Exception(_cannot_pickle_msg(_k)))\n continue\n\n _mem
= asizeof.asizeof(globals()[_k])\n if _mem >= 52428800:\n globals()[_k]
= ExplodingVariable(Exception(_too_large_msg(_k, _mem)))\n continue\n\n#
Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n #
Runs the user code in a new execution frame. Context from the previous\n #
component in the run is loaded into the session dynamically, and we run\n #
with a single globals() namespace to simulate top-level execution.\n exec(user_code,
globals(), globals())\n\n # Post new session context to mlflow:\n with
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_002\",\n \"metadata_type\": \"output\",\n \"metadata_value\":
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\nimport
argparse\n_parser = argparse.ArgumentParser(prog=''Same step 002 b7dbb05511774bd7bab385301d43b388
fn'', description='''')\n_parser.add_argument(\"--input-context\", dest=\"input_context_path\",
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--run-info\",
dest=\"run_info\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--metadata-url\",
dest=\"metadata_url\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-context\",
dest=\"output_context_path\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
= same_step_002_b7dbb05511774bd7bab385301d43b388_fn(**_parsed_args)\n"],
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context",
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional":
true}, {"default": "", "name": "metadata_url", "optional": true}], "name":
"Same step 002 b7dbb05511774bd7bab385301d43b388 fn", "outputs": [{"name":
"output_context", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}',
pipelines.kubeflow.org/arguments.parameters: '{"metadata_url": "{{inputs.parameters.metadata_url}}",
"run_info": "{{inputs.parameters.run-info-fn-run_info}}"}', pipelines.kubeflow.org/max_cache_staleness: P0D}
arguments:
parameters:
- {name: context, value: ''}
- {name: metadata_url, value: ''}
- {name: AWS_ACCESS_KEY_ID, value: minio}
- {name: AWS_SECRET_ACCESS_KEY, value: minio123}
- {name: MLFLOW_S3_ENDPOINT_URL, value: 'http://combinator-minio.mlflow.svc.cluster.local:9000'}
- {name: MLFLOW_TRACKING_URI, value: 'http://combinator-mlflow.mlflow.svc.cluster.local:5000'}
serviceAccountName: pipeline-runner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment