Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active March 23, 2024 01:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slopp/f60ba99c3d45f6fd5e948e71008ae451 to your computer and use it in GitHub Desktop.
Save slopp/f60ba99c3d45f6fd5e948e71008ae451 to your computer and use it in GitHub Desktop.
Dynamic pipeline that invokes k8 ops

Dynamic Pipeline

This example shows the psuedo-code for a Dagster pipeline that:

  1. Accepts the path to a raw dataset as a string
  2. Runs a step to break the raw dataset into partitions
  3. For each partition, the pipeline runs a series of two processing steps. Each processing step is a call out to a Docker container to run supplying the partition key as an input argument. The partitions are run together in parallel before being collected in a final processing step that operates on all the partitions.

To run the pipeline:

dagster dev -f dynamic_pipeline.py

What it looks like:

The job structure

Screen Shot 2023-07-20 at 3 48 04 PM

The job configuration

Screen Shot 2023-07-20 at 3 48 15 PM

The job run

Screen Shot 2023-07-20 at 3 47 55 PM

from dagster import op, job, DynamicOutput, Config, OpExecutionContext, DynamicOut, Definitions
from dagster_k8s import execute_k8s_job
from pydantic import Field
from typing import List
import random
import string
def split_raw_data_into_partitions(context: OpExecutionContext, path: str, batch_size: int) -> List[str]:
""" Function that calls an external system to stage raw data into a series of partitions """
context.log.info(f"Splitting {path} into {batch_size}")
return [random.choice(string.ascii_letters) for i in range(batch_size)]
class ProcessingConfig(Config):
batch_size: int = Field(10, description="Number of batches for parallel processing")
root_data_path: str = Field(description="Path to raw data to process")
@op(out=DynamicOut())
def prepare_partitions(context: OpExecutionContext, config: ProcessingConfig):
""" Given raw data and batch size, make partitions """
partition_keys = split_raw_data_into_partitions(context, config.root_data_path, config.batch_size)
for key in partition_keys:
yield DynamicOutput(key, mapping_key=key)
@op
def first_compute_piece(context: OpExecutionContext, partiton_key: str):
""" Run the first piece of processing for a partition using image x"""
context.log.info(f"Launching first container to process {partiton_key}")
if False:
execute_k8s_job(
image="busybox",
command=["/bin/sh", "-c"],
args=[f"echo {partiton_key}"],
)
return partiton_key
@op
def second_compute_piece(context: OpExecutionContext, partiton_key: str):
""" Run the second piece of processing for a partition using image x"""
context.log.info(f"Launching second container to process {partiton_key}")
if False:
execute_k8s_job(
image="busybox",
command=["/bin/sh", "-c"],
args=[f"echo {partiton_key}"],
)
return partiton_key
@op
def merge_and_analyze(context: OpExecutionContext, partitions: List[str]):
""" Do some final processing that needs the result of all the partitions"""
context.log.info(f"Finished processing the partitions: {partitions}")
return partitions
@job
def run_pipeline_job():
pieces = prepare_partitions()
results = pieces.map(first_compute_piece).map(second_compute_piece)
merge_and_analyze(results.collect())
defs = Definitions(
jobs=[run_pipeline_job]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment