Skip to content

Instantly share code, notes, and snippets.

@jrosell
Last active August 30, 2025 19:26
Show Gist options
  • Save jrosell/e283687c5db990ff6bd65092021fa512 to your computer and use it in GitHub Desktop.
Save jrosell/e283687c5db990ff6bd65092021fa512 to your computer and use it in GitHub Desktop.
Example of a machine learning pipeline in Python using dagster to skip the recomputation of assets if last modification dates are not updated from since previous computations. Also an example in R using the {targets} package.
from dagster import asset, Definitions, FilesystemIOManager, SkipReason
from sklearn.linear_model import LinearRegression
import pandas as pd
import joblib
import os
import time
DATA_DIR = "data"
os.makedirs(DATA_DIR, exist_ok=True)
# Helper function to skip if output is up-to-date
def skip_if_up_to_date(output_file: str, input_files: list):
if not os.path.exists(output_file):
return None
output_mtime = os.path.getmtime(output_file)
for input_file in input_files:
if not os.path.exists(input_file) or os.path.getmtime(input_file) > output_mtime:
return None
return SkipReason(f"{output_file} is up-to-date with inputs: {input_files}")
# Step 1: raw_data
@asset
def raw_data():
output_file = os.path.join(DATA_DIR, "raw_data.csv")
if os.path.exists(output_file):
return SkipReason(f"{output_file} already exists")
time.sleep(10)
df = pd.DataFrame({"x": [1, 2, 3, None, 5], "y": [2, 4, 6, 8, 10]})
df.to_csv(output_file, index=False)
return df
# Step 2: clean_data
@asset
def clean_data(raw_data):
output_file = os.path.join(DATA_DIR, "clean_data.csv")
input_file = os.path.join(DATA_DIR, "raw_data.csv")
skip = skip_if_up_to_date(output_file, [input_file])
if skip:
return skip
time.sleep(10)
clean_df = raw_data.dropna()
clean_df.to_csv(output_file, index=False)
return clean_df
# Step 3: train_model
@asset
def train_model(clean_data):
output_file = os.path.join(DATA_DIR, "model_lm.pkl")
input_file = os.path.join(DATA_DIR, "clean_data.csv")
skip = skip_if_up_to_date(output_file, [input_file])
if skip:
return skip
time.sleep(10)
X = clean_data[["x"]]
y = clean_data["y"]
model = LinearRegression().fit(X, y)
joblib.dump(model, output_file)
return output_file
# Definitions
defs = Definitions(
assets=[raw_data, clean_data, train_model],
resources={"fs": FilesystemIOManager(base_dir=DATA_DIR)},
)
library(targets)
library(tibble)
library(readr)
library(glmnet)
# Step 1: load raw data
load_raw_data <- function() {
Sys.sleep(10) # simulate slow load
tibble(
x = c(1, 2, 3, NA, 5),
y = c(2, 4, 6, 8, 10)
)
}
# Step 2: clean data
clean_raw_data <- function(raw_data) {
Sys.sleep(10) # simulate processing
raw_data[complete.cases(raw_data), ]
}
# Step 3: train model
train_model <- function(clean_data) {
Sys.sleep(10)
model <- lm(y ~ ., data = clean_data)
saveRDS(model, "model_lm.rds")
return("model_lm.rds")
}
list(
tar_target(
raw_data,
load_raw_data(),
format = "rds" # automatically cached
),
tar_target(
clean_data,
clean_raw_data(raw_data),
format = "rds"
),
tar_target(
trained_model,
train_model(clean_data),
format = "file" # tracks model_lm.rds as a file
)
)
@jrosell
Copy link
Author

jrosell commented Aug 29, 2025

For python:

$ uv venv
$ uv pip install --upgrade pip
$ uv pip install --upgrade dagster scikit-learn pandas
$ uv run dagster job execute -f pipeline.py -j __ASSET_JOB

For R:

$ Rscript -e "pak::pak(c('targets', 'tibble', 'readr', 'glmnet'))"
$ Rscript -e "targets::tar_make(script = 'pipeline.R')"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment