Skip to content

Instantly share code, notes, and snippets.

@htahir1
Created November 30, 2023 13:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save htahir1/4ec650548af5a42ab66dd4c2bf1ceea6 to your computer and use it in GitHub Desktop.
Save htahir1/4ec650548af5a42ab66dd4c2bf1ceea6 to your computer and use it in GitHub Desktop.
A dirty zenml sklearn pipeline quickly dumped
# {% include 'template/license_header' %}
from typing import Tuple
from typing_extensions import Annotated
import pandas as pd
from sklearn.model_selection import train_test_split
from zenml import step
import random
from typing import Optional, List
from typing import List, Optional, Tuple
from typing_extensions import Annotated
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from zenml import step
from typing import Tuple
import pandas as pd
from sklearn.datasets import load_breast_cancer
from typing_extensions import Annotated
from zenml import step
from zenml.logger import get_logger
logger = get_logger(__name__)
from zenml import pipeline, step
from zenml.logger import get_logger
logger = get_logger(__name__)
# {% include 'template/license_header' %}
from typing import Union
import pandas as pd
class NADropper:
"""Support class to drop NA values in sklearn Pipeline."""
def fit(self, *args, **kwargs):
return self
def transform(self, X: Union[pd.DataFrame, pd.Series]):
return X.dropna()
class ColumnsDropper:
"""Support class to drop specific columns in sklearn Pipeline."""
def __init__(self, columns):
self.columns = columns
def fit(self, *args, **kwargs):
return self
def transform(self, X: Union[pd.DataFrame, pd.Series]):
return X.drop(columns=self.columns)
class DataFrameCaster:
"""Support class to cast type back to pd.DataFrame in sklearn Pipeline."""
def __init__(self, columns):
self.columns = columns
def fit(self, *args, **kwargs):
return self
def transform(self, X):
return pd.DataFrame(X, columns=self.columns)
# {% include 'template/license_header' %}
# {% include 'template/license_header' %}
@step
def data_preprocessor(
dataset_trn: pd.DataFrame,
dataset_tst: pd.DataFrame,
drop_na: Optional[bool] = None,
normalize: Optional[bool] = None,
drop_columns: Optional[List[str]] = None,
) -> Tuple[
Annotated[pd.DataFrame, "dataset_trn"],
Annotated[pd.DataFrame, "dataset_tst"],
Annotated[Pipeline, "preprocess_pipeline"],
]:
"""Data preprocessor step.
This is an example of a data processor step that prepares the data so that
it is suitable for model training. It takes in a dataset as an input step
artifact and performs any necessary preprocessing steps like cleaning,
feature engineering, feature selection, etc. It then returns the processed
dataset as an step output artifact.
This step is parameterized, which allows you to configure the step
independently of the step code, before running it in a pipeline.
In this example, the step can be configured to drop NA values, drop some
columns and normalize numerical columns. See the documentation for more
information:
https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines
Args:
dataset_trn: The train dataset.
dataset_tst: The test dataset.
drop_na: If `True` all NA rows will be dropped.
normalize: If `True` all numeric fields will be normalized.
drop_columns: List of column names to drop.
Returns:
The processed datasets (dataset_trn, dataset_tst) and fitted `Pipeline` object.
"""
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
# We use the sklearn pipeline to chain together multiple preprocessing steps
preprocess_pipeline = Pipeline([("passthrough", "passthrough")])
if drop_na:
preprocess_pipeline.steps.append(("drop_na", NADropper()))
if drop_columns:
# Drop columns
preprocess_pipeline.steps.append(("drop_columns", ColumnsDropper(drop_columns)))
if normalize:
# Normalize the data
preprocess_pipeline.steps.append(("normalize", MinMaxScaler()))
preprocess_pipeline.steps.append(("cast", DataFrameCaster(dataset_trn.columns)))
dataset_trn = preprocess_pipeline.fit_transform(dataset_trn)
dataset_tst = preprocess_pipeline.transform(dataset_tst)
### YOUR CODE ENDS HERE ###
return dataset_trn, dataset_tst, preprocess_pipeline
# {% include 'template/license_header' %}
@step
def data_splitter(
dataset: pd.DataFrame, test_size: float = 0.2
) -> Tuple[
Annotated[pd.DataFrame, "raw_dataset_trn"],
Annotated[pd.DataFrame, "raw_dataset_tst"],
]:
"""Dataset splitter step.
This is an example of a dataset splitter step that splits the data
into train and test set before passing it to ML model.
This step is parameterized, which allows you to configure the step
independently of the step code, before running it in a pipeline.
In this example, the step can be configured to use different test
set sizes. See the documentation for more information:
https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines
Args:
dataset: Dataset read from source.
test_size: 0.0..1.0 defining portion of test set.
Returns:
The split dataset: dataset_trn, dataset_tst.
"""
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
dataset_trn, dataset_tst = train_test_split(
dataset,
test_size=test_size,
random_state=42,
shuffle=True,
)
dataset_trn = pd.DataFrame(dataset_trn, columns=dataset.columns)
dataset_tst = pd.DataFrame(dataset_tst, columns=dataset.columns)
### YOUR CODE ENDS HERE ###
return dataset_trn, dataset_tst
@step
def data_loader(
random_state: int, is_inference: bool = False
) -> Tuple[
Annotated[pd.DataFrame, "dataset"],
Annotated[str, "target"],
Annotated[int, "random_state"],
]:
"""Dataset reader step.
This is an example of a dataset reader step that load Breast Cancer dataset.
This step is parameterized, which allows you to configure the step
independently of the step code, before running it in a pipeline.
In this example, the step can be configured with number of rows and logic
to drop target column or not. See the documentation for more information:
https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines
Args:
is_inference: If `True` subset will be returned and target column
will be removed from dataset.
random_state: Random state for sampling
Returns:
The dataset artifact as Pandas DataFrame and name of target column.
"""
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
dataset = load_breast_cancer(as_frame=True)
inference_size = int(len(dataset.target) * 0.05)
target = "target"
dataset: pd.DataFrame = dataset.frame
inference_subset = dataset.sample(inference_size, random_state=random_state)
if is_inference:
dataset = inference_subset
dataset.drop(columns=target, inplace=True)
else:
dataset.drop(inference_subset.index, inplace=True)
dataset.reset_index(drop=True, inplace=True)
logger.info(f"Dataset with {len(dataset)} records loaded!")
### YOUR CODE ENDS HERE ###
return dataset, target, random_state
@pipeline
def _training(
test_size: float = 0.2,
drop_na: Optional[bool] = None,
normalize: Optional[bool] = None,
drop_columns: Optional[List[str]] = None,
):
"""
Model training pipeline.
This is a pipeline that loads the data, processes it and splits
it into train and test sets, then search for best hyperparameters,
trains and evaluates a model.
Args:
test_size: Size of holdout set for training 0.0..1.0
drop_na: If `True` NA values will be removed from dataset
normalize: If `True` dataset will be normalized with MinMaxScaler
drop_columns: List of columns to drop from dataset
"""
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
# Link all the steps together by calling them and passing the output
# of one step as the input of the next step.
raw_data, target, _ = data_loader(random_state=random.randint(0,100))
dataset_trn, dataset_tst = data_splitter(
dataset=raw_data,
test_size=test_size,
)
dataset_trn, dataset_tst, _ = data_preprocessor(
dataset_trn=dataset_trn,
dataset_tst=dataset_tst,
drop_na=drop_na,
normalize=normalize,
drop_columns=drop_columns,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment