Last active
March 19, 2021 19:09
-
-
Save Proteusiq/98909d83a24ae6016789cdd6e243e727 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from functools import wraps | |
from pathlib import Path | |
from typing import Callable, Optional, Union | |
import numpy as np | |
import pandas as pd | |
import ray | |
def pipeline_logger(function: Callable): | |
# what can be measure we can be "optimized" | |
@wraps(function) | |
def wrapper(*args, **kwargs): | |
start_time = time.perf_counter() | |
result = function(*args, **kwargs) | |
time_taken = time.perf_counter() - start_time | |
print( | |
f"[+] {function.__name__} completed " | |
f"| shape = {result.shape:} | time {time_taken:.3f}s" | |
) | |
return result | |
return wrapper | |
@pipeline_logger | |
def parallize(df, function, n_jobs: int, *args, **kwargs) -> pd.DataFrame: | |
futures = [ | |
function.remote(data, *args, **kwargs) for data in np.array_split(df, n_jobs) | |
] | |
df = pd.concat(ray.get(futures), ignore_index=True) | |
return df | |
@pipeline_logger | |
def start_transformation(df)-> pd.DataFrame:: | |
# avoids working on original dataset | |
return df.copy() | |
@ray.remote | |
def drop_single_words( | |
df: pd.DataFrame, column_name: Optional[str] = "description" | |
) -> pd.DataFrame: | |
# remove rows with single words | |
df = df[df[column_name].str.split().str.len() > 1] | |
return df | |
@ray.remote | |
def fill_missing_values( | |
df: pd.DataFrame, | |
column_name: str, | |
missing_value_replacer: Union[str, bool, int, float], | |
method=None, | |
) -> pd.DataFrame: | |
# fill missing values with a replacer | |
df[column_name] = df[column_name].fillna( | |
value=missing_value_replacer, method=method | |
) | |
return df | |
# Usage: We assume that the data transformations pipelines src folder, | |
# and each transformation has unit tests in test folder | |
# from src.transformation import ... | |
# parallize your functions | |
NUM_CORES = 8 | |
DATA_FOLDER = Path("./data") | |
RAW_MARKETING_DATA = DATA_FOLDER / "raw/estate_marketing.csv" | |
PROCESSED_MARKETING_DATA = DATA_FOLDER / "processed/estate_marketing.csv.gz2" | |
# call the helpers | |
ray.init(num_cpus=NUM_CORES) | |
# read raw data | |
dataf = pd.read_csv(RAW_MARKETING_DATA, parse_dates=["sale_date"]) | |
# perform data transformations | |
data = ( | |
dataf.pipe(start_transformation) | |
.pipe(parallize, function=drop_single_words, n_jobs=NUM_CORES) | |
.pipe( | |
parallize, | |
function=fill_missing_values, | |
column_name="energy_label", | |
missing_value_replacer="NA", | |
n_jobs=NUM_CORES, | |
) | |
) | |
# save compressed processed data | |
data.to_csv(PROCESSED_MARKETING_DATA, compression="bz2", mode="a", index=False) | |
# shutdown ray | |
ray.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment