Skip to content

Instantly share code, notes, and snippets.

@MatthewRalston
Created December 23, 2022 13:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MatthewRalston/e6ef762be10751e806abee90b5df7565 to your computer and use it in GitHub Desktop.
Save MatthewRalston/e6ef762be10751e806abee90b5df7565 to your computer and use it in GitHub Desktop.
A simple kubeflow pipeline step to determine if a file needs decompression
#!/bin/env python
import os
import sys
import argparse
import kfp.dsl as dsl
import kfp.components as comp
import kfp
import logging
global logger
logger = None
def get_root_logger(level):
levels=[logging.WARNING, logging.INFO, logging.DEBUG]
if level < 0 or level > 2:
raise TypeError("{0}.get_root_logger expects a verbosity between 0-2".format(__file__))
logging.basicConfig(level=levels[level], format="%(levelname)s: %(asctime)s %(funcName)s L%(lineno)s| %(message)s", datefmt="%Y/%m/%d %I:%M:%S")
root_logger = logging.getLogger()
return root_logger
components_dir = os.path.join(os.path.dirname(__file__), "components")
#gunzip = comp.load_component_from_file(os.path.join(components_dir, "gunzip.yaml"))
@dsl.component
def is_unzip_needed(infile:str) -> bool:
if type(infile) is not str:
raise TypeError("pipeline.is_unzip_needed() expects a str as its only positional argument")
elif infile.endswith(".gz"):
return True
else:
return False
@dsl.container_component
def gunzip(infile:dsl.InputPath(str), outfile:dsl.OutputPath(str)):
""" Infile like 'path/to/example.txt.gz', outfile like 'path/to/example.txt' """
return dsl.ContainerSpec(
image='bitnami/minideb:latest',
command=[
'gunzip ',
'$1 ',
'|| ',
'mv $1 $2'
],
args=[infile, outfile]
)
@dsl.pipeline(
name='kubeflow-barebone-demo',
description='kubeflow demo with minimal setup'
)
def rnaseq_pipeline(fastq1:str):
# Step 1: training component
unzip_is_needed = is_unzip_needed(infile=fastq1)
if unzip_is_needed:
unzipped_fastq1 = str(fastq1).rstrip(".gz")
gunzip1 = gunzip(infile=fastq1)
#gunzip1 = gunzip(infile=fastq1, outfile)
# with dsl.Condition(str(infile).endswith(".gz"), 'fastq1-needs-gunzip'):
# gunzip1 = gunzip(infile=infile)
# print("GUNZIPPED")
# print(gunzip1.output)
# with dsl.Condition(".gz" not in str(fastq1)):
# print("NOT GUNZIPPED: {0}".format(str(fastq1)))
# # Step 2: evaluation component
# evaluation = dsl.ContainerOp(
# name='evaluation',
# image='ubuntu:latest',
# command=[
# 'sh', '-c',
# 'echo "evaluating" && '
# f'echo \'load model from: {training.outputs["model_save_path"]}\' && '
# 'sleep 10 && '
# 'echo "accuracy: 0.$(shuf -i 0-99 -n 1)" > /accuracy.txt'
# ],
# file_outputs={'accuracy': '/accuracy.txt'}
# )
if __name__ == "__main__":
logger = get_root_logger(2)
kfp.compiler.Compiler().compile(rnaseq_pipeline, 'pipeline.yaml')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment