Created
November 30, 2023 13:22
-
-
Save htahir1/4ec650548af5a42ab66dd4c2bf1ceea6 to your computer and use it in GitHub Desktop.
A dirty zenml sklearn pipeline quickly dumped
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# {% include 'template/license_header' %} | |
from typing import Tuple | |
from typing_extensions import Annotated | |
import pandas as pd | |
from sklearn.model_selection import train_test_split | |
from zenml import step | |
import random | |
from typing import Optional, List | |
from typing import List, Optional, Tuple | |
from typing_extensions import Annotated | |
import pandas as pd | |
from sklearn.pipeline import Pipeline | |
from sklearn.preprocessing import MinMaxScaler | |
from zenml import step | |
from typing import Tuple | |
import pandas as pd | |
from sklearn.datasets import load_breast_cancer | |
from typing_extensions import Annotated | |
from zenml import step | |
from zenml.logger import get_logger | |
logger = get_logger(__name__) | |
from zenml import pipeline, step | |
from zenml.logger import get_logger | |
logger = get_logger(__name__) | |
# {% include 'template/license_header' %} | |
from typing import Union | |
import pandas as pd | |
class NADropper: | |
"""Support class to drop NA values in sklearn Pipeline.""" | |
def fit(self, *args, **kwargs): | |
return self | |
def transform(self, X: Union[pd.DataFrame, pd.Series]): | |
return X.dropna() | |
class ColumnsDropper: | |
"""Support class to drop specific columns in sklearn Pipeline.""" | |
def __init__(self, columns): | |
self.columns = columns | |
def fit(self, *args, **kwargs): | |
return self | |
def transform(self, X: Union[pd.DataFrame, pd.Series]): | |
return X.drop(columns=self.columns) | |
class DataFrameCaster: | |
"""Support class to cast type back to pd.DataFrame in sklearn Pipeline.""" | |
def __init__(self, columns): | |
self.columns = columns | |
def fit(self, *args, **kwargs): | |
return self | |
def transform(self, X): | |
return pd.DataFrame(X, columns=self.columns) | |
# {% include 'template/license_header' %} | |
# {% include 'template/license_header' %} | |
@step | |
def data_preprocessor( | |
dataset_trn: pd.DataFrame, | |
dataset_tst: pd.DataFrame, | |
drop_na: Optional[bool] = None, | |
normalize: Optional[bool] = None, | |
drop_columns: Optional[List[str]] = None, | |
) -> Tuple[ | |
Annotated[pd.DataFrame, "dataset_trn"], | |
Annotated[pd.DataFrame, "dataset_tst"], | |
Annotated[Pipeline, "preprocess_pipeline"], | |
]: | |
"""Data preprocessor step. | |
This is an example of a data processor step that prepares the data so that | |
it is suitable for model training. It takes in a dataset as an input step | |
artifact and performs any necessary preprocessing steps like cleaning, | |
feature engineering, feature selection, etc. It then returns the processed | |
dataset as an step output artifact. | |
This step is parameterized, which allows you to configure the step | |
independently of the step code, before running it in a pipeline. | |
In this example, the step can be configured to drop NA values, drop some | |
columns and normalize numerical columns. See the documentation for more | |
information: | |
https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines | |
Args: | |
dataset_trn: The train dataset. | |
dataset_tst: The test dataset. | |
drop_na: If `True` all NA rows will be dropped. | |
normalize: If `True` all numeric fields will be normalized. | |
drop_columns: List of column names to drop. | |
Returns: | |
The processed datasets (dataset_trn, dataset_tst) and fitted `Pipeline` object. | |
""" | |
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### | |
# We use the sklearn pipeline to chain together multiple preprocessing steps | |
preprocess_pipeline = Pipeline([("passthrough", "passthrough")]) | |
if drop_na: | |
preprocess_pipeline.steps.append(("drop_na", NADropper())) | |
if drop_columns: | |
# Drop columns | |
preprocess_pipeline.steps.append(("drop_columns", ColumnsDropper(drop_columns))) | |
if normalize: | |
# Normalize the data | |
preprocess_pipeline.steps.append(("normalize", MinMaxScaler())) | |
preprocess_pipeline.steps.append(("cast", DataFrameCaster(dataset_trn.columns))) | |
dataset_trn = preprocess_pipeline.fit_transform(dataset_trn) | |
dataset_tst = preprocess_pipeline.transform(dataset_tst) | |
### YOUR CODE ENDS HERE ### | |
return dataset_trn, dataset_tst, preprocess_pipeline | |
# {% include 'template/license_header' %} | |
@step | |
def data_splitter( | |
dataset: pd.DataFrame, test_size: float = 0.2 | |
) -> Tuple[ | |
Annotated[pd.DataFrame, "raw_dataset_trn"], | |
Annotated[pd.DataFrame, "raw_dataset_tst"], | |
]: | |
"""Dataset splitter step. | |
This is an example of a dataset splitter step that splits the data | |
into train and test set before passing it to ML model. | |
This step is parameterized, which allows you to configure the step | |
independently of the step code, before running it in a pipeline. | |
In this example, the step can be configured to use different test | |
set sizes. See the documentation for more information: | |
https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines | |
Args: | |
dataset: Dataset read from source. | |
test_size: 0.0..1.0 defining portion of test set. | |
Returns: | |
The split dataset: dataset_trn, dataset_tst. | |
""" | |
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### | |
dataset_trn, dataset_tst = train_test_split( | |
dataset, | |
test_size=test_size, | |
random_state=42, | |
shuffle=True, | |
) | |
dataset_trn = pd.DataFrame(dataset_trn, columns=dataset.columns) | |
dataset_tst = pd.DataFrame(dataset_tst, columns=dataset.columns) | |
### YOUR CODE ENDS HERE ### | |
return dataset_trn, dataset_tst | |
@step | |
def data_loader( | |
random_state: int, is_inference: bool = False | |
) -> Tuple[ | |
Annotated[pd.DataFrame, "dataset"], | |
Annotated[str, "target"], | |
Annotated[int, "random_state"], | |
]: | |
"""Dataset reader step. | |
This is an example of a dataset reader step that load Breast Cancer dataset. | |
This step is parameterized, which allows you to configure the step | |
independently of the step code, before running it in a pipeline. | |
In this example, the step can be configured with number of rows and logic | |
to drop target column or not. See the documentation for more information: | |
https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines | |
Args: | |
is_inference: If `True` subset will be returned and target column | |
will be removed from dataset. | |
random_state: Random state for sampling | |
Returns: | |
The dataset artifact as Pandas DataFrame and name of target column. | |
""" | |
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### | |
dataset = load_breast_cancer(as_frame=True) | |
inference_size = int(len(dataset.target) * 0.05) | |
target = "target" | |
dataset: pd.DataFrame = dataset.frame | |
inference_subset = dataset.sample(inference_size, random_state=random_state) | |
if is_inference: | |
dataset = inference_subset | |
dataset.drop(columns=target, inplace=True) | |
else: | |
dataset.drop(inference_subset.index, inplace=True) | |
dataset.reset_index(drop=True, inplace=True) | |
logger.info(f"Dataset with {len(dataset)} records loaded!") | |
### YOUR CODE ENDS HERE ### | |
return dataset, target, random_state | |
@pipeline | |
def _training( | |
test_size: float = 0.2, | |
drop_na: Optional[bool] = None, | |
normalize: Optional[bool] = None, | |
drop_columns: Optional[List[str]] = None, | |
): | |
""" | |
Model training pipeline. | |
This is a pipeline that loads the data, processes it and splits | |
it into train and test sets, then search for best hyperparameters, | |
trains and evaluates a model. | |
Args: | |
test_size: Size of holdout set for training 0.0..1.0 | |
drop_na: If `True` NA values will be removed from dataset | |
normalize: If `True` dataset will be normalized with MinMaxScaler | |
drop_columns: List of columns to drop from dataset | |
""" | |
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### | |
# Link all the steps together by calling them and passing the output | |
# of one step as the input of the next step. | |
raw_data, target, _ = data_loader(random_state=random.randint(0,100)) | |
dataset_trn, dataset_tst = data_splitter( | |
dataset=raw_data, | |
test_size=test_size, | |
) | |
dataset_trn, dataset_tst, _ = data_preprocessor( | |
dataset_trn=dataset_trn, | |
dataset_tst=dataset_tst, | |
drop_na=drop_na, | |
normalize=normalize, | |
drop_columns=drop_columns, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment