Skip to content

Instantly share code, notes, and snippets.

@gregsheremeta
Last active July 17, 2023 21:03
Show Gist options
  • Save gregsheremeta/b603c65fef11b78b8b7248b151bd8e7d to your computer and use it in GitHub Desktop.
Save gregsheremeta/b603c65fef11b78b8b7248b151bd8e7d to your computer and use it in GitHub Desktop.
a3 - same as a1, but no string in OutputPath
"""Test pipeline to exercise various data flow mechanisms."""
import kfp
"""Producer"""
def send_file(
outgoingfile: kfp.components.OutputPath(),
):
import urllib.request
print("starting download...")
urllib.request.urlretrieve("http://212.183.159.230/20MB.zip", outgoingfile)
print("done")
"""Consumer"""
def receive_file(
incomingfile: kfp.components.InputPath(),
saveartifact: kfp.components.OutputPath(),
):
import os
import shutil
print("reading %s, size is %s" % (incomingfile, os.path.getsize(incomingfile)))
with open(incomingfile, "rb") as f:
b = f.read(1)
print("read byte: %s" % b)
f.close()
print("copying in %s to out %s" % (incomingfile, saveartifact))
shutil.copyfile(incomingfile, saveartifact)
"""Build the producer component"""
send_file_op = kfp.components.create_component_from_func(
send_file,
base_image="registry.access.redhat.com/ubi8/python-38",
)
"""Build the consumer component"""
receive_file_op = kfp.components.create_component_from_func(
receive_file,
base_image="registry.access.redhat.com/ubi8/python-38",
)
"""Wire up the pipeline"""
@kfp.dsl.pipeline(
name="Test Data Passing Pipeline 1",
)
def wire_up_pipeline():
import json
send_file_task = send_file_op()
receive_file_task = receive_file_op(
send_file_task.output,
).add_pod_annotation(name='artifact_outputs', value=json.dumps(['saveartifact']))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment