Created
May 7, 2022 00:40
-
-
Save aronchick/25d6fca71df0ef86846c40bec5cbc2c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: argoproj.io/v1alpha1 | |
kind: Workflow | |
metadata: | |
generateName: compilation-of-pipelines- | |
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12, pipelines.kubeflow.org/pipeline_compilation_time: '2022-05-06T23:57:02.426196', | |
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "", "name": "context", | |
"optional": true}, {"default": "", "name": "metadata_url", "optional": true}, | |
{"default": "minio", "name": "AWS_ACCESS_KEY_ID", "optional": true, "type": | |
"String"}, {"default": "minio123", "name": "AWS_SECRET_ACCESS_KEY", "optional": | |
true, "type": "String"}, {"default": "http://combinator-minio.mlflow.svc.cluster.local:9000", | |
"name": "MLFLOW_S3_ENDPOINT_URL", "optional": true, "type": "String"}, {"default": | |
"http://combinator-mlflow.mlflow.svc.cluster.local:5000", "name": "MLFLOW_TRACKING_URI", | |
"optional": true, "type": "String"}], "name": "Compilation of pipelines"}'} | |
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12} | |
spec: | |
entrypoint: compilation-of-pipelines | |
templates: | |
- name: compilation-of-pipelines | |
inputs: | |
parameters: | |
- {name: AWS_ACCESS_KEY_ID} | |
- {name: AWS_SECRET_ACCESS_KEY} | |
- {name: MLFLOW_S3_ENDPOINT_URL} | |
- {name: MLFLOW_TRACKING_URI} | |
- {name: metadata_url} | |
dag: | |
tasks: | |
- {name: run-info-fn, template: run-info-fn} | |
- name: same-step-000-a29af0569ab04006b678357f767987db-fn | |
template: same-step-000-a29af0569ab04006b678357f767987db-fn | |
dependencies: [run-info-fn] | |
arguments: | |
parameters: | |
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'} | |
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'} | |
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'} | |
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'} | |
- {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'} | |
- {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'} | |
- name: same-step-001-60ad85fb69284d98992da58abd28344c-fn | |
template: same-step-001-60ad85fb69284d98992da58abd28344c-fn | |
dependencies: [run-info-fn, same-step-000-a29af0569ab04006b678357f767987db-fn] | |
arguments: | |
parameters: | |
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'} | |
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'} | |
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'} | |
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'} | |
- {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'} | |
- {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'} | |
artifacts: | |
- {name: same-step-000-a29af0569ab04006b678357f767987db-fn-output_context, | |
from: '{{tasks.same-step-000-a29af0569ab04006b678357f767987db-fn.outputs.artifacts.same-step-000-a29af0569ab04006b678357f767987db-fn-output_context}}'} | |
- name: same-step-002-b7dbb05511774bd7bab385301d43b388-fn | |
template: same-step-002-b7dbb05511774bd7bab385301d43b388-fn | |
dependencies: [run-info-fn, same-step-001-60ad85fb69284d98992da58abd28344c-fn] | |
arguments: | |
parameters: | |
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'} | |
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'} | |
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'} | |
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'} | |
- {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'} | |
- {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'} | |
artifacts: | |
- {name: same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context, | |
from: '{{tasks.same-step-001-60ad85fb69284d98992da58abd28344c-fn.outputs.artifacts.same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context}}'} | |
- name: run-info-fn | |
container: | |
args: [--run-id, '{{workflow.uid}}', '----output-paths', /tmp/outputs/run_info/data] | |
command: | |
- sh | |
- -c | |
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | |
'kfp' 'dill' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet | |
--no-warn-script-location 'kfp' 'dill' --user) && "$0" "$@" | |
- sh | |
- -ec | |
- | | |
program_path=$(mktemp) | |
printf "%s" "$0" > "$program_path" | |
python3 -u "$program_path" "$@" | |
- | | |
def run_info_fn( | |
run_id, | |
): | |
from base64 import urlsafe_b64encode | |
from collections import namedtuple | |
import datetime | |
import base64 | |
import dill | |
import kfp | |
client = kfp.Client(host="http://ml-pipeline:8888") | |
run_info = client.get_run(run_id=run_id) | |
run_info_dict = { | |
"run_id": run_info.run.id, | |
"name": run_info.run.name, | |
"created_at": run_info.run.created_at.isoformat(), | |
"pipeline_id": run_info.run.pipeline_spec.pipeline_id, | |
} | |
# Track kubernetes resources associated wth the run. | |
for r in run_info.run.resource_references: | |
run_info_dict[f"{r.key.type.lower()}_id"] = r.key.id | |
# Base64-encoded as value is visible in kubeflow ui. | |
output = urlsafe_b64encode(dill.dumps(run_info_dict)) | |
return namedtuple("RunInfoOutput", ["run_info"])( | |
str(output, encoding="ascii") | |
) | |
def _serialize_str(str_value: str) -> str: | |
if not isinstance(str_value, str): | |
raise TypeError('Value "{}" has type "{}" instead of str.'.format( | |
str(str_value), str(type(str_value)))) | |
return str_value | |
import argparse | |
_parser = argparse.ArgumentParser(prog='Run info fn', description='') | |
_parser.add_argument("--run-id", dest="run_id", type=str, required=True, default=argparse.SUPPRESS) | |
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) | |
_parsed_args = vars(_parser.parse_args()) | |
_output_files = _parsed_args.pop("_output_paths", []) | |
_outputs = run_info_fn(**_parsed_args) | |
_output_serializers = [ | |
_serialize_str, | |
] | |
import os | |
for idx, output_file in enumerate(_output_files): | |
try: | |
os.makedirs(os.path.dirname(output_file)) | |
except OSError: | |
pass | |
with open(output_file, 'w') as f: | |
f.write(_output_serializers[idx](_outputs[idx])) | |
image: python:3.7 | |
outputs: | |
parameters: | |
- name: run-info-fn-run_info | |
valueFrom: {path: /tmp/outputs/run_info/data} | |
artifacts: | |
- {name: run-info-fn-run_info, path: /tmp/outputs/run_info/data} | |
metadata: | |
labels: | |
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12 | |
pipelines.kubeflow.org/pipeline-sdk-type: kfp | |
pipelines.kubeflow.org/enable_caching: "true" | |
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container": | |
{"args": ["--run-id", {"inputValue": "run_id"}, "----output-paths", {"outputPath": | |
"run_info"}], "command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 | |
python3 -m pip install --quiet --no-warn-script-location ''kfp'' ''dill'' | |
|| PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | |
''kfp'' ''dill'' --user) && \"$0\" \"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf | |
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", | |
"def run_info_fn(\n run_id,\n):\n from base64 import urlsafe_b64encode\n from | |
collections import namedtuple\n import datetime\n import base64\n import | |
dill\n import kfp\n\n client = kfp.Client(host=\"http://ml-pipeline:8888\")\n run_info | |
= client.get_run(run_id=run_id)\n\n run_info_dict = {\n \"run_id\": | |
run_info.run.id,\n \"name\": run_info.run.name,\n \"created_at\": | |
run_info.run.created_at.isoformat(),\n \"pipeline_id\": run_info.run.pipeline_spec.pipeline_id,\n }\n\n # | |
Track kubernetes resources associated wth the run.\n for r in run_info.run.resource_references:\n run_info_dict[f\"{r.key.type.lower()}_id\"] | |
= r.key.id\n\n # Base64-encoded as value is visible in kubeflow ui.\n output | |
= urlsafe_b64encode(dill.dumps(run_info_dict))\n\n return namedtuple(\"RunInfoOutput\", | |
[\"run_info\"])(\n str(output, encoding=\"ascii\")\n )\n\ndef | |
_serialize_str(str_value: str) -> str:\n if not isinstance(str_value, | |
str):\n raise TypeError(''Value \"{}\" has type \"{}\" instead of | |
str.''.format(\n str(str_value), str(type(str_value))))\n return | |
str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Run | |
info fn'', description='''')\n_parser.add_argument(\"--run-id\", dest=\"run_id\", | |
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\", | |
dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files | |
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = run_info_fn(**_parsed_args)\n\n_output_serializers | |
= [\n _serialize_str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except | |
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"], | |
"image": "python:3.7"}}, "inputs": [{"name": "run_id", "type": "String"}], | |
"name": "Run info fn", "outputs": [{"name": "run_info", "type": "String"}]}', | |
pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/arguments.parameters: '{"run_id": | |
"{{workflow.uid}}"}'} | |
- name: same-step-000-a29af0569ab04006b678357f767987db-fn | |
container: | |
args: [--input-context, /tmp/inputs/input_context/data, --run-info, '{{inputs.parameters.run-info-fn-run_info}}', | |
--metadata-url, '{{inputs.parameters.metadata_url}}', --output-context, /tmp/outputs/output_context/data] | |
command: | |
- sh | |
- -c | |
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | |
'dill' 'requests' 'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' | |
'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' || PIP_DISABLE_PIP_VERSION_CHECK=1 | |
python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests' | |
'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' | |
'Requests' 'scipy' 'tensorflow' --user) && "$0" "$@" | |
- sh | |
- -ec | |
- | | |
program_path=$(mktemp) | |
printf "%s" "$0" > "$program_path" | |
python3 -u "$program_path" "$@" | |
- | | |
def _make_parent_dirs_and_return_path(file_path: str): | |
import os | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
return file_path | |
def same_step_000_a29af0569ab04006b678357f767987db_fn( | |
input_context_path, | |
output_context_path, | |
run_info="gAR9lC4=", | |
metadata_url="", | |
): | |
from base64 import urlsafe_b64encode, urlsafe_b64decode | |
from pathlib import Path | |
import datetime | |
import requests | |
import tempfile | |
import dill | |
import os | |
input_context = None | |
with Path(input_context_path).open("rb") as reader: | |
input_context = reader.read() | |
# Helper function for posting metadata to mlflow. | |
def post_metadata(json): | |
if metadata_url == "": | |
return | |
try: | |
req = requests.post(metadata_url, json=json) | |
req.raise_for_status() | |
except requests.exceptions.HTTPError as err: | |
print(f"Error posting metadata: {err}") | |
# Move to writable directory as user might want to do file IO. | |
# TODO: won't persist across steps, might need support in SDK? | |
os.chdir(tempfile.mkdtemp()) | |
# Load information about the current experiment run: | |
run_info = dill.loads(urlsafe_b64decode(run_info)) | |
# Post session context to mlflow. | |
if len(input_context) > 0: | |
input_context_str = urlsafe_b64encode(input_context) | |
post_metadata({ | |
"experiment_id": run_info["experiment_id"], | |
"run_id": run_info["run_id"], | |
"step_id": "same_step_000", | |
"metadata_type": "input", | |
"metadata_value": input_context_str, | |
"metadata_time": datetime.datetime.now().isoformat(), | |
}) | |
# User code for step, which we run in its own execution frame. | |
user_code = f""" | |
from base64 import urlsafe_b64decode | |
from pympler import asizeof | |
from tempfile import mktemp | |
import dill | |
import os | |
# Install dependencies from requirements file. | |
_req_file = mktemp() | |
with open(_req_file, "w") as _file: | |
_file.write(urlsafe_b64decode("bnVtcHkgPT0gMS4yMS4yCg==").decode()) | |
os.system("pip install -r " + _req_file) | |
# Load the exploding variables module: | |
{urlsafe_b64decode("import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
").decode()} | |
# Load session context into global namespace: | |
if { len(input_context) } > 0: | |
dill.load_session("{ input_context_path }") | |
# Run the user's notebook code: | |
{urlsafe_b64decode("ZGF0YXNldCA9ICdzYW1wbGVfZGF0YScKZ3B1X3R5cGUgPSAnQTEwMCcKaW1wb3J0IHRlbnNvcmZsb3cKaW1wb3J0IGRhdGV0aW1lCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IDEwCmIgPSBhICsgNSAjMTUKZnJvbSBJUHl0aG9uLmRpc3BsYXkgaW1wb3J0IEltYWdlCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy9GYXJvZUlzbGFuZHMuanBlZycKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjMjUKCmZyb20gSVB5dGhvbiBpbXBvcnQgZGlzcGxheQpkaXNwbGF5LkltYWdlKHVybCkKCmltcG9ydCBwbG90bHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgpkZWYgc29tZV9tYXRoKHgsIHopIC0-IHR1cGxlOgogICAgcmV0dXJuIChyb3VuZCh4ICsgeiwgMiksIHJvdW5kKHggLyB6LCAyKSkKCmEgPSBhICogMjAKYiA9IGIgKiAxMDAgIzI1MDAKCnByaW50KGYiQiA9IHtifSIp").decode()} | |
# Various types of serialisation failures we'd like to inform the user of: | |
def _cannot_pickle_msg(k): | |
return "Variable '" + k +"' was defined in previous steps, but could not be serialised as it was not supported by 'dill'." | |
def _too_large_msg(k, mem): | |
return "Variable '" + k + "' was defined in previous steps, but could not be serialised as it used too much memory (" + str(mem) + "B)." | |
# Replace unserialisable variables with exploding variables so that the user | |
# knows why they cannot use them in future steps: | |
_all_keys = list(globals().keys()) | |
for _k in _all_keys: | |
if not dill.pickles(globals()[_k], safe=True): | |
globals()[_k] = ExplodingVariable(Exception(_cannot_pickle_msg(_k))) | |
continue | |
_mem = asizeof.asizeof(globals()[_k]) | |
if _mem >= 52428800: | |
globals()[_k] = ExplodingVariable(Exception(_too_large_msg(_k, _mem))) | |
continue | |
# Save new session context to disk for the next component: | |
dill.dump_session("{output_context_path}") | |
""" | |
# Runs the user code in a new execution frame. Context from the previous | |
# component in the run is loaded into the session dynamically, and we run | |
# with a single globals() namespace to simulate top-level execution. | |
exec(user_code, globals(), globals()) | |
# Post new session context to mlflow: | |
with Path(output_context_path).open("rb") as reader: | |
context = urlsafe_b64encode(reader.read()) | |
post_metadata({ | |
"experiment_id": run_info["experiment_id"], | |
"run_id": run_info["run_id"], | |
"step_id": "same_step_000", | |
"metadata_type": "output", | |
"metadata_value": context, | |
"metadata_time": datetime.datetime.now().isoformat(), | |
}) | |
import argparse | |
_parser = argparse.ArgumentParser(prog='Same step 000 a29af0569ab04006b678357f767987db fn', description='') | |
_parser.add_argument("--input-context", dest="input_context_path", type=str, required=True, default=argparse.SUPPRESS) | |
_parser.add_argument("--run-info", dest="run_info", type=str, required=False, default=argparse.SUPPRESS) | |
_parser.add_argument("--metadata-url", dest="metadata_url", type=str, required=False, default=argparse.SUPPRESS) | |
_parser.add_argument("--output-context", dest="output_context_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) | |
_parsed_args = vars(_parser.parse_args()) | |
_outputs = same_step_000_a29af0569ab04006b678357f767987db_fn(**_parsed_args) | |
env: | |
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'} | |
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'} | |
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'} | |
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'} | |
image: library/python:3.9-slim-buster | |
inputs: | |
parameters: | |
- {name: AWS_ACCESS_KEY_ID} | |
- {name: AWS_SECRET_ACCESS_KEY} | |
- {name: MLFLOW_S3_ENDPOINT_URL} | |
- {name: MLFLOW_TRACKING_URI} | |
- {name: metadata_url} | |
- {name: run-info-fn-run_info} | |
artifacts: | |
- name: input_context | |
path: /tmp/inputs/input_context/data | |
raw: {data: ''} | |
outputs: | |
artifacts: | |
- {name: same-step-000-a29af0569ab04006b678357f767987db-fn-output_context, path: /tmp/outputs/output_context/data} | |
metadata: | |
labels: | |
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12 | |
pipelines.kubeflow.org/pipeline-sdk-type: kfp | |
pipelines.kubeflow.org/enable_caching: "true" | |
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container": | |
{"args": ["--input-context", {"inputPath": "input_context"}, {"if": {"cond": | |
{"isPresent": "run_info"}, "then": ["--run-info", {"inputValue": "run_info"}]}}, | |
{"if": {"cond": {"isPresent": "metadata_url"}, "then": ["--metadata-url", | |
{"inputValue": "metadata_url"}]}}, "--output-context", {"outputPath": "output_context"}], | |
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip | |
install --quiet --no-warn-script-location ''dill'' ''requests'' ''pympler==1.0.1'' | |
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly'' | |
''Requests'' ''scipy'' ''tensorflow'' || PIP_DISABLE_PIP_VERSION_CHECK=1 | |
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests'' | |
''pympler==1.0.1'' ''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' | |
''pandas'' ''plotly'' ''Requests'' ''scipy'' ''tensorflow'' --user) && \"$0\" | |
\"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 | |
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path: | |
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return | |
file_path\n\ndef same_step_000_a29af0569ab04006b678357f767987db_fn(\n input_context_path,\n output_context_path,\n run_info=\"gAR9lC4=\",\n metadata_url=\"\",\n):\n from | |
base64 import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import | |
Path\n import datetime\n import requests\n import tempfile\n import | |
dill\n import os\n\n input_context = None\n with Path(input_context_path).open(\"rb\") | |
as reader:\n input_context = reader.read()\n\n # Helper function | |
for posting metadata to mlflow.\n def post_metadata(json):\n if | |
metadata_url == \"\":\n return\n\n try:\n req | |
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except | |
requests.exceptions.HTTPError as err:\n print(f\"Error posting | |
metadata: {err}\")\n\n # Move to writable directory as user might want | |
to do file IO.\n # TODO: won''t persist across steps, might need support | |
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about | |
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n # | |
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str | |
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\": | |
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\": | |
\"same_step_000\",\n \"metadata_type\": \"input\",\n \"metadata_value\": | |
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n # | |
User code for step, which we run in its own execution frame.\n user_code | |
= f\"\"\"\nfrom base64 import urlsafe_b64decode\nfrom pympler import asizeof\nfrom | |
tempfile import mktemp\nimport dill\nimport os\n\n# Install dependencies | |
from requirements file.\n_req_file = mktemp()\nwith open(_req_file, \"w\") | |
as _file:\n _file.write(urlsafe_b64decode(\"bnVtcHkgPT0gMS4yMS4yCg==\").decode())\nos.system(\"pip | |
install -r \" + _req_file)\n\n# Load the exploding variables module:\n{urlsafe_b64decode(\"import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
\").decode()}\n\n# | |
Load session context into global namespace:\nif { len(input_context) } > | |
0:\n dill.load_session(\"{ input_context_path }\")\n\n# Run the user''s | |
notebook code:\n{urlsafe_b64decode(\"ZGF0YXNldCA9ICdzYW1wbGVfZGF0YScKZ3B1X3R5cGUgPSAnQTEwMCcKaW1wb3J0IHRlbnNvcmZsb3cKaW1wb3J0IGRhdGV0aW1lCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IDEwCmIgPSBhICsgNSAjMTUKZnJvbSBJUHl0aG9uLmRpc3BsYXkgaW1wb3J0IEltYWdlCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy9GYXJvZUlzbGFuZHMuanBlZycKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjMjUKCmZyb20gSVB5dGhvbiBpbXBvcnQgZGlzcGxheQpkaXNwbGF5LkltYWdlKHVybCkKCmltcG9ydCBwbG90bHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgpkZWYgc29tZV9tYXRoKHgsIHopIC0-IHR1cGxlOgogICAgcmV0dXJuIChyb3VuZCh4ICsgeiwgMiksIHJvdW5kKHggLyB6LCAyKSkKCmEgPSBhICogMjAKYiA9IGIgKiAxMDAgIzI1MDAKCnByaW50KGYiQiA9IHtifSIp\").decode()}\n\n# | |
Various types of serialisation failures we''d like to inform the user of:\ndef | |
_cannot_pickle_msg(k):\n return \"Variable ''\" + k +\"'' was defined | |
in previous steps, but could not be serialised as it was not supported by | |
''dill''.\"\n\ndef _too_large_msg(k, mem):\n return \"Variable ''\" + | |
k + \"'' was defined in previous steps, but could not be serialised as it | |
used too much memory (\" + str(mem) + \"B).\"\n\n# Replace unserialisable | |
variables with exploding variables so that the user\n# knows why they cannot | |
use them in future steps:\n_all_keys = list(globals().keys())\nfor _k in | |
_all_keys:\n if not dill.pickles(globals()[_k], safe=True):\n globals()[_k] | |
= ExplodingVariable(Exception(_cannot_pickle_msg(_k)))\n continue\n\n _mem | |
= asizeof.asizeof(globals()[_k])\n if _mem >= 52428800:\n globals()[_k] | |
= ExplodingVariable(Exception(_too_large_msg(_k, _mem)))\n continue\n\n# | |
Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n # | |
Runs the user code in a new execution frame. Context from the previous\n # | |
component in the run is loaded into the session dynamically, and we run\n # | |
with a single globals() namespace to simulate top-level execution.\n exec(user_code, | |
globals(), globals())\n\n # Post new session context to mlflow:\n with | |
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\": | |
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\": | |
\"same_step_000\",\n \"metadata_type\": \"output\",\n \"metadata_value\": | |
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\nimport | |
argparse\n_parser = argparse.ArgumentParser(prog=''Same step 000 a29af0569ab04006b678357f767987db | |
fn'', description='''')\n_parser.add_argument(\"--input-context\", dest=\"input_context_path\", | |
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--run-info\", | |
dest=\"run_info\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--metadata-url\", | |
dest=\"metadata_url\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-context\", | |
dest=\"output_context_path\", type=_make_parent_dirs_and_return_path, required=True, | |
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs | |
= same_step_000_a29af0569ab04006b678357f767987db_fn(**_parsed_args)\n"], | |
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context", | |
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional": | |
true}, {"default": "", "name": "metadata_url", "optional": true}], "name": | |
"Same step 000 a29af0569ab04006b678357f767987db fn", "outputs": [{"name": | |
"output_context", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}', | |
pipelines.kubeflow.org/arguments.parameters: '{"metadata_url": "{{inputs.parameters.metadata_url}}", | |
"run_info": "{{inputs.parameters.run-info-fn-run_info}}"}', pipelines.kubeflow.org/max_cache_staleness: P0D} | |
- name: same-step-001-60ad85fb69284d98992da58abd28344c-fn | |
container: | |
args: [--input-context, /tmp/inputs/input_context/data, --run-info, '{{inputs.parameters.run-info-fn-run_info}}', | |
--metadata-url, '{{inputs.parameters.metadata_url}}', --output-context, /tmp/outputs/output_context/data] | |
command: | |
- sh | |
- -c | |
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | |
'dill' 'requests' 'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' | |
'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' || PIP_DISABLE_PIP_VERSION_CHECK=1 | |
python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests' | |
'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' | |
'Requests' 'scipy' 'tensorflow' --user) && "$0" "$@" | |
- sh | |
- -ec | |
- | | |
program_path=$(mktemp) | |
printf "%s" "$0" > "$program_path" | |
python3 -u "$program_path" "$@" | |
- | | |
def _make_parent_dirs_and_return_path(file_path: str): | |
import os | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
return file_path | |
def same_step_001_60ad85fb69284d98992da58abd28344c_fn( | |
input_context_path, | |
output_context_path, | |
run_info="gAR9lC4=", | |
metadata_url="", | |
): | |
from base64 import urlsafe_b64encode, urlsafe_b64decode | |
from pathlib import Path | |
import datetime | |
import requests | |
import tempfile | |
import dill | |
import os | |
input_context = None | |
with Path(input_context_path).open("rb") as reader: | |
input_context = reader.read() | |
# Helper function for posting metadata to mlflow. | |
def post_metadata(json): | |
if metadata_url == "": | |
return | |
try: | |
req = requests.post(metadata_url, json=json) | |
req.raise_for_status() | |
except requests.exceptions.HTTPError as err: | |
print(f"Error posting metadata: {err}") | |
# Move to writable directory as user might want to do file IO. | |
# TODO: won't persist across steps, might need support in SDK? | |
os.chdir(tempfile.mkdtemp()) | |
# Load information about the current experiment run: | |
run_info = dill.loads(urlsafe_b64decode(run_info)) | |
# Post session context to mlflow. | |
if len(input_context) > 0: | |
input_context_str = urlsafe_b64encode(input_context) | |
post_metadata({ | |
"experiment_id": run_info["experiment_id"], | |
"run_id": run_info["run_id"], | |
"step_id": "same_step_001", | |
"metadata_type": "input", | |
"metadata_value": input_context_str, | |
"metadata_time": datetime.datetime.now().isoformat(), | |
}) | |
# User code for step, which we run in its own execution frame. | |
user_code = f""" | |
from base64 import urlsafe_b64decode | |
from pympler import asizeof | |
from tempfile import mktemp | |
import dill | |
import os | |
# Install dependencies from requirements file. | |
_req_file = mktemp() | |
with open(_req_file, "w") as _file: | |
_file.write(urlsafe_b64decode("bnVtcHkgPT0gMS4yMS4yCg==").decode()) | |
os.system("pip install -r " + _req_file) | |
# Load the exploding variables module: | |
{urlsafe_b64decode("import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
").decode()} | |
# Load session context into global namespace: | |
if { len(input_context) } > 0: | |
dill.load_session("{ input_context_path }") | |
# Run the user's notebook code: | |
{urlsafe_b64decode("aW1wb3J0IG51bXB5IGFzIG5wCmltcG9ydCBtYXRwbG90bGliLnB5cGxvdCBhcyBwbHQKaW1wb3J0IHNjaXB5LnN0YXRzIGFzIHN0YXRzCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKbXUgPSAwCnN0ZCA9IDEKCnggPSBucC5saW5zcGFjZShzdGFydD0tNCwgc3RvcD00LCBudW09MTAwKQp5ID0gc3RhdHMubm9ybS5wZGYoeCwgbXUsIHN0ZCkKCmEgPSBhICsgNQpiID0gYiArIDEwICMgMjUxNSAKCnBsdC5wbG90KHgsIHkpCnBsdC5zaG93KCkKaW1wb3J0IHJlcXVlc3RzCmltcG9ydCBwYW5kYXMgYXMgcGQKaW1wb3J0IHBsb3RseS5maWd1cmVfZmFjdG9yeSBhcyBmZgppbXBvcnQgY2hhcnRfc3R1ZGlvLnBsb3RseSBhcyBweQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCnVybCA9ICdodHRwczovL3Jhdy5naXRodWJ1c2VyY29udGVudC5jb20vU0FNRS1Qcm9qZWN0L1NBTUUtc2FtcGxlcy9tYWluL3Rlc3QtYXJ0aWZhY3RzL3Rlc3QuY3N2JwpkZiA9IHBkLnJlYWRfY3N2KHVybCkKCmEgPSBhICogMTAwMApiID0gYiAvIDY3ICMgMzcuNTM3MzEzNDMyOAoKZGYuZGVzY3JpYmUoKQ==").decode()} | |
# Various types of serialisation failures we'd like to inform the user of: | |
def _cannot_pickle_msg(k): | |
return "Variable '" + k +"' was defined in previous steps, but could not be serialised as it was not supported by 'dill'." | |
def _too_large_msg(k, mem): | |
return "Variable '" + k + "' was defined in previous steps, but could not be serialised as it used too much memory (" + str(mem) + "B)." | |
# Replace unserialisable variables with exploding variables so that the user | |
# knows why they cannot use them in future steps: | |
_all_keys = list(globals().keys()) | |
for _k in _all_keys: | |
if not dill.pickles(globals()[_k], safe=True): | |
globals()[_k] = ExplodingVariable(Exception(_cannot_pickle_msg(_k))) | |
continue | |
_mem = asizeof.asizeof(globals()[_k]) | |
if _mem >= 52428800: | |
globals()[_k] = ExplodingVariable(Exception(_too_large_msg(_k, _mem))) | |
continue | |
# Save new session context to disk for the next component: | |
dill.dump_session("{output_context_path}") | |
""" | |
# Runs the user code in a new execution frame. Context from the previous | |
# component in the run is loaded into the session dynamically, and we run | |
# with a single globals() namespace to simulate top-level execution. | |
exec(user_code, globals(), globals()) | |
# Post new session context to mlflow: | |
with Path(output_context_path).open("rb") as reader: | |
context = urlsafe_b64encode(reader.read()) | |
post_metadata({ | |
"experiment_id": run_info["experiment_id"], | |
"run_id": run_info["run_id"], | |
"step_id": "same_step_001", | |
"metadata_type": "output", | |
"metadata_value": context, | |
"metadata_time": datetime.datetime.now().isoformat(), | |
}) | |
import argparse | |
_parser = argparse.ArgumentParser(prog='Same step 001 60ad85fb69284d98992da58abd28344c fn', description='') | |
_parser.add_argument("--input-context", dest="input_context_path", type=str, required=True, default=argparse.SUPPRESS) | |
_parser.add_argument("--run-info", dest="run_info", type=str, required=False, default=argparse.SUPPRESS) | |
_parser.add_argument("--metadata-url", dest="metadata_url", type=str, required=False, default=argparse.SUPPRESS) | |
_parser.add_argument("--output-context", dest="output_context_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) | |
_parsed_args = vars(_parser.parse_args()) | |
_outputs = same_step_001_60ad85fb69284d98992da58abd28344c_fn(**_parsed_args) | |
env: | |
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'} | |
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'} | |
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'} | |
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'} | |
image: library/python:3.9-slim-buster | |
inputs: | |
parameters: | |
- {name: AWS_ACCESS_KEY_ID} | |
- {name: AWS_SECRET_ACCESS_KEY} | |
- {name: MLFLOW_S3_ENDPOINT_URL} | |
- {name: MLFLOW_TRACKING_URI} | |
- {name: metadata_url} | |
- {name: run-info-fn-run_info} | |
artifacts: | |
- {name: same-step-000-a29af0569ab04006b678357f767987db-fn-output_context, path: /tmp/inputs/input_context/data} | |
outputs: | |
artifacts: | |
- {name: same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context, path: /tmp/outputs/output_context/data} | |
metadata: | |
labels: | |
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12 | |
pipelines.kubeflow.org/pipeline-sdk-type: kfp | |
pipelines.kubeflow.org/enable_caching: "true" | |
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container": | |
{"args": ["--input-context", {"inputPath": "input_context"}, {"if": {"cond": | |
{"isPresent": "run_info"}, "then": ["--run-info", {"inputValue": "run_info"}]}}, | |
{"if": {"cond": {"isPresent": "metadata_url"}, "then": ["--metadata-url", | |
{"inputValue": "metadata_url"}]}}, "--output-context", {"outputPath": "output_context"}], | |
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip | |
install --quiet --no-warn-script-location ''dill'' ''requests'' ''pympler==1.0.1'' | |
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly'' | |
''Requests'' ''scipy'' ''tensorflow'' || PIP_DISABLE_PIP_VERSION_CHECK=1 | |
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests'' | |
''pympler==1.0.1'' ''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' | |
''pandas'' ''plotly'' ''Requests'' ''scipy'' ''tensorflow'' --user) && \"$0\" | |
\"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 | |
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path: | |
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return | |
file_path\n\ndef same_step_001_60ad85fb69284d98992da58abd28344c_fn(\n input_context_path,\n output_context_path,\n run_info=\"gAR9lC4=\",\n metadata_url=\"\",\n):\n from | |
base64 import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import | |
Path\n import datetime\n import requests\n import tempfile\n import | |
dill\n import os\n\n input_context = None\n with Path(input_context_path).open(\"rb\") | |
as reader:\n input_context = reader.read()\n\n # Helper function | |
for posting metadata to mlflow.\n def post_metadata(json):\n if | |
metadata_url == \"\":\n return\n\n try:\n req | |
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except | |
requests.exceptions.HTTPError as err:\n print(f\"Error posting | |
metadata: {err}\")\n\n # Move to writable directory as user might want | |
to do file IO.\n # TODO: won''t persist across steps, might need support | |
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about | |
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n # | |
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str | |
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\": | |
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\": | |
\"same_step_001\",\n \"metadata_type\": \"input\",\n \"metadata_value\": | |
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n # | |
User code for step, which we run in its own execution frame.\n user_code | |
= f\"\"\"\nfrom base64 import urlsafe_b64decode\nfrom pympler import asizeof\nfrom | |
tempfile import mktemp\nimport dill\nimport os\n\n# Install dependencies | |
from requirements file.\n_req_file = mktemp()\nwith open(_req_file, \"w\") | |
as _file:\n _file.write(urlsafe_b64decode(\"bnVtcHkgPT0gMS4yMS4yCg==\").decode())\nos.system(\"pip | |
install -r \" + _req_file)\n\n# Load the exploding variables module:\n{urlsafe_b64decode(\"import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
\").decode()}\n\n# | |
Load session context into global namespace:\nif { len(input_context) } > | |
0:\n dill.load_session(\"{ input_context_path }\")\n\n# Run the user''s | |
notebook code:\n{urlsafe_b64decode(\"aW1wb3J0IG51bXB5IGFzIG5wCmltcG9ydCBtYXRwbG90bGliLnB5cGxvdCBhcyBwbHQKaW1wb3J0IHNjaXB5LnN0YXRzIGFzIHN0YXRzCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKbXUgPSAwCnN0ZCA9IDEKCnggPSBucC5saW5zcGFjZShzdGFydD0tNCwgc3RvcD00LCBudW09MTAwKQp5ID0gc3RhdHMubm9ybS5wZGYoeCwgbXUsIHN0ZCkKCmEgPSBhICsgNQpiID0gYiArIDEwICMgMjUxNSAKCnBsdC5wbG90KHgsIHkpCnBsdC5zaG93KCkKaW1wb3J0IHJlcXVlc3RzCmltcG9ydCBwYW5kYXMgYXMgcGQKaW1wb3J0IHBsb3RseS5maWd1cmVfZmFjdG9yeSBhcyBmZgppbXBvcnQgY2hhcnRfc3R1ZGlvLnBsb3RseSBhcyBweQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCnVybCA9ICdodHRwczovL3Jhdy5naXRodWJ1c2VyY29udGVudC5jb20vU0FNRS1Qcm9qZWN0L1NBTUUtc2FtcGxlcy9tYWluL3Rlc3QtYXJ0aWZhY3RzL3Rlc3QuY3N2JwpkZiA9IHBkLnJlYWRfY3N2KHVybCkKCmEgPSBhICogMTAwMApiID0gYiAvIDY3ICMgMzcuNTM3MzEzNDMyOAoKZGYuZGVzY3JpYmUoKQ==\").decode()}\n\n# | |
Various types of serialisation failures we''d like to inform the user of:\ndef | |
_cannot_pickle_msg(k):\n return \"Variable ''\" + k +\"'' was defined | |
in previous steps, but could not be serialised as it was not supported by | |
''dill''.\"\n\ndef _too_large_msg(k, mem):\n return \"Variable ''\" + | |
k + \"'' was defined in previous steps, but could not be serialised as it | |
used too much memory (\" + str(mem) + \"B).\"\n\n# Replace unserialisable | |
variables with exploding variables so that the user\n# knows why they cannot | |
use them in future steps:\n_all_keys = list(globals().keys())\nfor _k in | |
_all_keys:\n if not dill.pickles(globals()[_k], safe=True):\n globals()[_k] | |
= ExplodingVariable(Exception(_cannot_pickle_msg(_k)))\n continue\n\n _mem | |
= asizeof.asizeof(globals()[_k])\n if _mem >= 52428800:\n globals()[_k] | |
= ExplodingVariable(Exception(_too_large_msg(_k, _mem)))\n continue\n\n# | |
Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n # | |
Runs the user code in a new execution frame. Context from the previous\n # | |
component in the run is loaded into the session dynamically, and we run\n # | |
with a single globals() namespace to simulate top-level execution.\n exec(user_code, | |
globals(), globals())\n\n # Post new session context to mlflow:\n with | |
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\": | |
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\": | |
\"same_step_001\",\n \"metadata_type\": \"output\",\n \"metadata_value\": | |
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\nimport | |
argparse\n_parser = argparse.ArgumentParser(prog=''Same step 001 60ad85fb69284d98992da58abd28344c | |
fn'', description='''')\n_parser.add_argument(\"--input-context\", dest=\"input_context_path\", | |
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--run-info\", | |
dest=\"run_info\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--metadata-url\", | |
dest=\"metadata_url\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-context\", | |
dest=\"output_context_path\", type=_make_parent_dirs_and_return_path, required=True, | |
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs | |
= same_step_001_60ad85fb69284d98992da58abd28344c_fn(**_parsed_args)\n"], | |
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context", | |
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional": | |
true}, {"default": "", "name": "metadata_url", "optional": true}], "name": | |
"Same step 001 60ad85fb69284d98992da58abd28344c fn", "outputs": [{"name": | |
"output_context", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}', | |
pipelines.kubeflow.org/arguments.parameters: '{"metadata_url": "{{inputs.parameters.metadata_url}}", | |
"run_info": "{{inputs.parameters.run-info-fn-run_info}}"}', pipelines.kubeflow.org/max_cache_staleness: P0D} | |
- name: same-step-002-b7dbb05511774bd7bab385301d43b388-fn | |
container: | |
args: [--input-context, /tmp/inputs/input_context/data, --run-info, '{{inputs.parameters.run-info-fn-run_info}}', | |
--metadata-url, '{{inputs.parameters.metadata_url}}', --output-context, /tmp/outputs/output_context/data] | |
command: | |
- sh | |
- -c | |
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location | |
'dill' 'requests' 'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' | |
'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' || PIP_DISABLE_PIP_VERSION_CHECK=1 | |
python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests' | |
'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' | |
'Requests' 'scipy' 'tensorflow' --user) && "$0" "$@" | |
- sh | |
- -ec | |
- | | |
program_path=$(mktemp) | |
printf "%s" "$0" > "$program_path" | |
python3 -u "$program_path" "$@" | |
- | | |
def _make_parent_dirs_and_return_path(file_path: str): | |
import os | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
return file_path | |
def same_step_002_b7dbb05511774bd7bab385301d43b388_fn( | |
input_context_path, | |
output_context_path, | |
run_info="gAR9lC4=", | |
metadata_url="", | |
): | |
from base64 import urlsafe_b64encode, urlsafe_b64decode | |
from pathlib import Path | |
import datetime | |
import requests | |
import tempfile | |
import dill | |
import os | |
input_context = None | |
with Path(input_context_path).open("rb") as reader: | |
input_context = reader.read() | |
# Helper function for posting metadata to mlflow. | |
def post_metadata(json): | |
if metadata_url == "": | |
return | |
try: | |
req = requests.post(metadata_url, json=json) | |
req.raise_for_status() | |
except requests.exceptions.HTTPError as err: | |
print(f"Error posting metadata: {err}") | |
# Move to writable directory as user might want to do file IO. | |
# TODO: won't persist across steps, might need support in SDK? | |
os.chdir(tempfile.mkdtemp()) | |
# Load information about the current experiment run: | |
run_info = dill.loads(urlsafe_b64decode(run_info)) | |
# Post session context to mlflow. | |
if len(input_context) > 0: | |
input_context_str = urlsafe_b64encode(input_context) | |
post_metadata({ | |
"experiment_id": run_info["experiment_id"], | |
"run_id": run_info["run_id"], | |
"step_id": "same_step_002", | |
"metadata_type": "input", | |
"metadata_value": input_context_str, | |
"metadata_time": datetime.datetime.now().isoformat(), | |
}) | |
# User code for step, which we run in its own execution frame. | |
user_code = f""" | |
from base64 import urlsafe_b64decode | |
from pympler import asizeof | |
from tempfile import mktemp | |
import dill | |
import os | |
# Install dependencies from requirements file. | |
_req_file = mktemp() | |
with open(_req_file, "w") as _file: | |
_file.write(urlsafe_b64decode("bnVtcHkgPT0gMS4yMS4yCg==").decode()) | |
os.system("pip install -r " + _req_file) | |
# Load the exploding variables module: | |
{urlsafe_b64decode("import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
").decode()} | |
# Load session context into global namespace: | |
if { len(input_context) } > 0: | |
dill.load_session("{ input_context_path }") | |
# Run the user's notebook code: | |
{urlsafe_b64decode("YSA9IGEgKyA1CmIgPSBiICsgMTAgIyA0Ny41MzczMTM0MzI4CnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmcgPSBzb21lX21hdGgoOCwgMjEpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmogPSBnWzBdCmsgPSBnWzFdCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA1Ny41MzczMTM0MzI4CgpwcmludChmImo6IHtqfSIpCnByaW50KGYiazoge2t9IikKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDY3LjUzNzMxMzQzMjgKcHJpbnQoIjAuMC4yIikKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKcHJpbnQoZiJBY2Nlc3NpbmcgdGhlIHZhbHVlIG9mIEI6IHtifSIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCg==").decode()} | |
# Various types of serialisation failures we'd like to inform the user of: | |
def _cannot_pickle_msg(k): | |
return "Variable '" + k +"' was defined in previous steps, but could not be serialised as it was not supported by 'dill'." | |
def _too_large_msg(k, mem): | |
return "Variable '" + k + "' was defined in previous steps, but could not be serialised as it used too much memory (" + str(mem) + "B)." | |
# Replace unserialisable variables with exploding variables so that the user | |
# knows why they cannot use them in future steps: | |
_all_keys = list(globals().keys()) | |
for _k in _all_keys: | |
if not dill.pickles(globals()[_k], safe=True): | |
globals()[_k] = ExplodingVariable(Exception(_cannot_pickle_msg(_k))) | |
continue | |
_mem = asizeof.asizeof(globals()[_k]) | |
if _mem >= 52428800: | |
globals()[_k] = ExplodingVariable(Exception(_too_large_msg(_k, _mem))) | |
continue | |
# Save new session context to disk for the next component: | |
dill.dump_session("{output_context_path}") | |
""" | |
# Runs the user code in a new execution frame. Context from the previous | |
# component in the run is loaded into the session dynamically, and we run | |
# with a single globals() namespace to simulate top-level execution. | |
exec(user_code, globals(), globals()) | |
# Post new session context to mlflow: | |
with Path(output_context_path).open("rb") as reader: | |
context = urlsafe_b64encode(reader.read()) | |
post_metadata({ | |
"experiment_id": run_info["experiment_id"], | |
"run_id": run_info["run_id"], | |
"step_id": "same_step_002", | |
"metadata_type": "output", | |
"metadata_value": context, | |
"metadata_time": datetime.datetime.now().isoformat(), | |
}) | |
import argparse | |
_parser = argparse.ArgumentParser(prog='Same step 002 b7dbb05511774bd7bab385301d43b388 fn', description='') | |
_parser.add_argument("--input-context", dest="input_context_path", type=str, required=True, default=argparse.SUPPRESS) | |
_parser.add_argument("--run-info", dest="run_info", type=str, required=False, default=argparse.SUPPRESS) | |
_parser.add_argument("--metadata-url", dest="metadata_url", type=str, required=False, default=argparse.SUPPRESS) | |
_parser.add_argument("--output-context", dest="output_context_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) | |
_parsed_args = vars(_parser.parse_args()) | |
_outputs = same_step_002_b7dbb05511774bd7bab385301d43b388_fn(**_parsed_args) | |
env: | |
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'} | |
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'} | |
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'} | |
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'} | |
image: library/python:3.9-slim-buster | |
inputs: | |
parameters: | |
- {name: AWS_ACCESS_KEY_ID} | |
- {name: AWS_SECRET_ACCESS_KEY} | |
- {name: MLFLOW_S3_ENDPOINT_URL} | |
- {name: MLFLOW_TRACKING_URI} | |
- {name: metadata_url} | |
- {name: run-info-fn-run_info} | |
artifacts: | |
- {name: same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context, path: /tmp/inputs/input_context/data} | |
outputs: | |
artifacts: | |
- {name: same-step-002-b7dbb05511774bd7bab385301d43b388-fn-output_context, path: /tmp/outputs/output_context/data} | |
metadata: | |
labels: | |
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12 | |
pipelines.kubeflow.org/pipeline-sdk-type: kfp | |
pipelines.kubeflow.org/enable_caching: "true" | |
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container": | |
{"args": ["--input-context", {"inputPath": "input_context"}, {"if": {"cond": | |
{"isPresent": "run_info"}, "then": ["--run-info", {"inputValue": "run_info"}]}}, | |
{"if": {"cond": {"isPresent": "metadata_url"}, "then": ["--metadata-url", | |
{"inputValue": "metadata_url"}]}}, "--output-context", {"outputPath": "output_context"}], | |
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip | |
install --quiet --no-warn-script-location ''dill'' ''requests'' ''pympler==1.0.1'' | |
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly'' | |
''Requests'' ''scipy'' ''tensorflow'' || PIP_DISABLE_PIP_VERSION_CHECK=1 | |
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests'' | |
''pympler==1.0.1'' ''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' | |
''pandas'' ''plotly'' ''Requests'' ''scipy'' ''tensorflow'' --user) && \"$0\" | |
\"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 | |
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path: | |
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return | |
file_path\n\ndef same_step_002_b7dbb05511774bd7bab385301d43b388_fn(\n input_context_path,\n output_context_path,\n run_info=\"gAR9lC4=\",\n metadata_url=\"\",\n):\n from | |
base64 import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import | |
Path\n import datetime\n import requests\n import tempfile\n import | |
dill\n import os\n\n input_context = None\n with Path(input_context_path).open(\"rb\") | |
as reader:\n input_context = reader.read()\n\n # Helper function | |
for posting metadata to mlflow.\n def post_metadata(json):\n if | |
metadata_url == \"\":\n return\n\n try:\n req | |
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except | |
requests.exceptions.HTTPError as err:\n print(f\"Error posting | |
metadata: {err}\")\n\n # Move to writable directory as user might want | |
to do file IO.\n # TODO: won''t persist across steps, might need support | |
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about | |
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n # | |
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str | |
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\": | |
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\": | |
\"same_step_002\",\n \"metadata_type\": \"input\",\n \"metadata_value\": | |
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n # | |
User code for step, which we run in its own execution frame.\n user_code | |
= f\"\"\"\nfrom base64 import urlsafe_b64decode\nfrom pympler import asizeof\nfrom | |
tempfile import mktemp\nimport dill\nimport os\n\n# Install dependencies | |
from requirements file.\n_req_file = mktemp()\nwith open(_req_file, \"w\") | |
as _file:\n _file.write(urlsafe_b64decode(\"bnVtcHkgPT0gMS4yMS4yCg==\").decode())\nos.system(\"pip | |
install -r \" + _req_file)\n\n# Load the exploding variables module:\n{urlsafe_b64decode(\"import dill


class BaseExplodingVariable:
    """Base class for ExplodingVariable."""

    def __setattr__(self, *args):
        raise self.err

    def __getattr__(self, attr):
        raise self.err

    def __delattr__(self, *args):
        raise self.err

    def __repr__(self, *args):
        raise self.err

    def __str__(self, *args):
        raise self.err

    def __unicode__(self, *args):
        raise self.err

    def __bytes__(self, *args):
        raise self.err

    def __format__(self, *args):
        raise self.err

    def __lt__(self, *args):
        raise self.err

    def __le__(self, *args):
        raise self.err

    def __eq__(self, *args):
        raise self.err

    def __ne__(self, *args):
        raise self.err

    def __gt__(self, *args):
        raise self.err

    def __ge__(self, *args):
        raise self.err

    def __hash__(self, *args):
        raise self.err

    def __bool__(self, *args):
        raise self.err

    def __dir__(self, *args):
        raise self.err

    def __get__(self, *args):
        raise self.err

    def __set__(self, *args):
        raise self.err

    def __delete__(self, *args):
        raise self.err

    def __set_name__(self, *args):
        raise self.err

    def __instancecheck__(self, *args):
        raise self.err

    def __subclasscheck__(self, *args):
        raise self.err

    def __call__(self, *args):
        raise self.err

    def __len__(self, *args):
        raise self.err

    def __length_hint__(self, *args):
        raise self.err

    def __getitem__(self, *args):
        raise self.err

    def __setitem__(self, *args):
        raise self.err

    def __delitem__(self, *args):
        raise self.err

    def __missing__(self, *args):
        raise self.err

    def __item__(self, *args):
        raise self.err

    def __reversed__(self, *args):
        raise self.err

    def __contains__(self, *args):
        raise self.err

    def __add__(self, *args):
        raise self.err

    def __sub__(self, *args):
        raise self.err

    def __mul__(self, *args):
        raise self.err

    def __matmul__(self, *args):
        raise self.err

    def __truediv__(self, *args):
        raise self.err

    def __floordiv__(self, *args):
        raise self.err

    def __mod__(self, *args):
        raise self.err

    def __divmod__(self, *args):
        raise self.err

    def __pow__(self, *args):
        raise self.err

    def __lshift__(self, *args):
        raise self.err

    def __rshift__(self, *args):
        raise self.err

    def __and__(self, *args):
        raise self.err

    def __xor__(self, *args):
        raise self.err

    def __or__(self, *args):
        raise self.err

    def __radd__(self, *args):
        raise self.err

    def __rsub__(self, *args):
        raise self.err

    def __rmul__(self, *args):
        raise self.err

    def __rmatmul__(self, *args):
        raise self.err

    def __rtruediv__(self, *args):
        raise self.err

    def __rfloordiv__(self, *args):
        raise self.err

    def __rmod__(self, *args):
        raise self.err

    def __rdivmod__(self, *args):
        raise self.err

    def __rpow__(self, *args):
        raise self.err

    def __rlshift__(self, *args):
        raise self.err

    def __rrshift__(self, *args):
        raise self.err

    def __rand__(self, *args):
        raise self.err

    def __rxor__(self, *args):
        raise self.err

    def __ror__(self, *args):
        raise self.err

    def __iadd__(self, *args):
        raise self.err

    def __isub__(self, *args):
        raise self.err

    def __imul__(self, *args):
        raise self.err

    def __imatmul__(self, *args):
        raise self.err

    def __itruediv__(self, *args):
        raise self.err

    def __ifloordiv__(self, *args):
        raise self.err

    def __imod__(self, *args):
        raise self.err

    def __ipow__(self, *args):
        raise self.err

    def __ilshift__(self, *args):
        raise self.err

    def __irshift__(self, *args):
        raise self.err

    def __iand__(self, *args):
        raise self.err

    def __ixor__(self, *args):
        raise self.err

    def __ior__(self, *args):
        raise self.err

    def __neg__(self, *args):
        raise self.err

    def __pos__(self, *args):
        raise self.err

    def __abs__(self, *args):
        raise self.err

    def __invert__(self, *args):
        raise self.err

    def __complex__(self, *args):
        raise self.err

    def __int__(self, *args):
        raise self.err

    def __float__(self, *args):
        raise self.err

    def __index__(self, *args):
        raise self.err

    def __round__(self, *args):
        raise self.err

    def __trunc__(self, *args):
        raise self.err

    def __floor__(self, *args):
        raise self.err

    def __ceil__(self, *args):
        raise self.err

    def __enter__(self, *args):
        raise self.err

    def __exit__(self, *args):
        raise self.err


class ExplodingVariable(BaseExplodingVariable):
    """
    Exploding variables raise an error when anything is done with them. They
    are used to patch variables that cannot be serialised for various reasons
    when passing context between containers for different SAME steps. The
    error should inform the user of the reason the variable cannot be
    serialised - dill might not support it, it may use too much memory etc.

    Our approach is to patch all special methods on the class:
    https://docs.python.org/3/reference/datamodel.html#special-method-names
    """

    def __init__(self, err):
        self.err = err

    # Patch to allow setting 'self.err' in the constructor.
    def __setattr__(self, attr, value):
        if attr == "err":
            self.__dict__[attr] = value
            return

        super().__setattr__(self, attr, value)


# Custom deserialiser for ExplodingVariable that bypasses the explosion.
def deserialise(err):
    return ExplodingVariable(err)


# Custom serialiser for ExplodingVariable that bypasses the explosion.
@dill.register(ExplodingVariable)
def serialise(pickler, obj):
    pickler.save_reduce(deserialise, (obj.err,), obj=obj)
\").decode()}\n\n# | |
Load session context into global namespace:\nif { len(input_context) } > | |
0:\n dill.load_session(\"{ input_context_path }\")\n\n# Run the user''s | |
notebook code:\n{urlsafe_b64decode(\"YSA9IGEgKyA1CmIgPSBiICsgMTAgIyA0Ny41MzczMTM0MzI4CnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmcgPSBzb21lX21hdGgoOCwgMjEpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmogPSBnWzBdCmsgPSBnWzFdCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA1Ny41MzczMTM0MzI4CgpwcmludChmImo6IHtqfSIpCnByaW50KGYiazoge2t9IikKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDY3LjUzNzMxMzQzMjgKcHJpbnQoIjAuMC4yIikKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKcHJpbnQoZiJBY2Nlc3NpbmcgdGhlIHZhbHVlIG9mIEI6IHtifSIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCg==\").decode()}\n\n# | |
Various types of serialisation failures we''d like to inform the user of:\ndef | |
_cannot_pickle_msg(k):\n return \"Variable ''\" + k +\"'' was defined | |
in previous steps, but could not be serialised as it was not supported by | |
''dill''.\"\n\ndef _too_large_msg(k, mem):\n return \"Variable ''\" + | |
k + \"'' was defined in previous steps, but could not be serialised as it | |
used too much memory (\" + str(mem) + \"B).\"\n\n# Replace unserialisable | |
variables with exploding variables so that the user\n# knows why they cannot | |
use them in future steps:\n_all_keys = list(globals().keys())\nfor _k in | |
_all_keys:\n if not dill.pickles(globals()[_k], safe=True):\n globals()[_k] | |
= ExplodingVariable(Exception(_cannot_pickle_msg(_k)))\n continue\n\n _mem | |
= asizeof.asizeof(globals()[_k])\n if _mem >= 52428800:\n globals()[_k] | |
= ExplodingVariable(Exception(_too_large_msg(_k, _mem)))\n continue\n\n# | |
Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n # | |
Runs the user code in a new execution frame. Context from the previous\n # | |
component in the run is loaded into the session dynamically, and we run\n # | |
with a single globals() namespace to simulate top-level execution.\n exec(user_code, | |
globals(), globals())\n\n # Post new session context to mlflow:\n with | |
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\": | |
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\": | |
\"same_step_002\",\n \"metadata_type\": \"output\",\n \"metadata_value\": | |
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\nimport | |
argparse\n_parser = argparse.ArgumentParser(prog=''Same step 002 b7dbb05511774bd7bab385301d43b388 | |
fn'', description='''')\n_parser.add_argument(\"--input-context\", dest=\"input_context_path\", | |
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--run-info\", | |
dest=\"run_info\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--metadata-url\", | |
dest=\"metadata_url\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-context\", | |
dest=\"output_context_path\", type=_make_parent_dirs_and_return_path, required=True, | |
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs | |
= same_step_002_b7dbb05511774bd7bab385301d43b388_fn(**_parsed_args)\n"], | |
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context", | |
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional": | |
true}, {"default": "", "name": "metadata_url", "optional": true}], "name": | |
"Same step 002 b7dbb05511774bd7bab385301d43b388 fn", "outputs": [{"name": | |
"output_context", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}', | |
pipelines.kubeflow.org/arguments.parameters: '{"metadata_url": "{{inputs.parameters.metadata_url}}", | |
"run_info": "{{inputs.parameters.run-info-fn-run_info}}"}', pipelines.kubeflow.org/max_cache_staleness: P0D} | |
arguments: | |
parameters: | |
- {name: context, value: ''} | |
- {name: metadata_url, value: ''} | |
- {name: AWS_ACCESS_KEY_ID, value: minio} | |
- {name: AWS_SECRET_ACCESS_KEY, value: minio123} | |
- {name: MLFLOW_S3_ENDPOINT_URL, value: 'http://combinator-minio.mlflow.svc.cluster.local:9000'} | |
- {name: MLFLOW_TRACKING_URI, value: 'http://combinator-mlflow.mlflow.svc.cluster.local:5000'} | |
serviceAccountName: pipeline-runner |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment