Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save gregsheremeta/4cd6425efc3072000ad5bfd1df5b1b58 to your computer and use it in GitHub Desktop.
Save gregsheremeta/4cd6425efc3072000ad5bfd1df5b1b58 to your computer and use it in GitHub Desktop.
"""Test pipeline to exercise various data flow mechanisms."""
import kfp
"""Producer"""
def send_file(
outgoing_file: kfp.components.OutputPath(),
):
def create_test_file(path, size):
with open(path, 'wb') as file:
file.seek(size)
file.write(b'0')
create_test_file(outgoing_file, 1024 * 1024 * 50) # this causes error
# create_test_file(outgoing_file, 50) # this works fine
"""Consumer"""
def receive_file(
incoming_file: kfp.components.InputPath(),
save_artifact: kfp.components.OutputPath("MyArtifact"),
):
import os
import shutil
print("reading %s, size is %s" % (incoming_file, os.path.getsize(incoming_file)))
with open(incoming_file, "rb") as f:
b = f.read(1)
print("read byte: %s" % b)
f.close()
print("copying in %s to out %s" % (incoming_file, save_artifact))
shutil.copyfile(incoming_file, save_artifact)
def dummy_step(
incoming_file: kfp.components.InputPath(),
):
print("hi there")
"""Build the producer component"""
send_file_op = kfp.components.create_component_from_func(
send_file,
base_image="registry.access.redhat.com/ubi8/python-38",
)
"""Build the consumer component"""
receive_file_op = kfp.components.create_component_from_func(
receive_file,
base_image="registry.access.redhat.com/ubi8/python-38",
)
dummy_step_op = kfp.components.create_component_from_func(
dummy_step,
base_image="registry.access.redhat.com/ubi8/python-38",
)
"""Wire up the pipeline"""
@kfp.dsl.pipeline(
name="Test Data Passing Pipeline 1",
)
def wire_up_pipeline():
send_file_task = send_file_op()
receive_file_task = receive_file_op(
send_file_task.output,
)
dummy_step_task = dummy_step_op(
receive_file_task.output,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment