Skip to content

Instantly share code, notes, and snippets.

@Proteusiq
Last active March 19, 2021 19:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Proteusiq/98909d83a24ae6016789cdd6e243e727 to your computer and use it in GitHub Desktop.
Save Proteusiq/98909d83a24ae6016789cdd6e243e727 to your computer and use it in GitHub Desktop.
import time
from functools import wraps
from pathlib import Path
from typing import Callable, Optional, Union
import numpy as np
import pandas as pd
import ray
def pipeline_logger(function: Callable):
# what can be measure we can be "optimized"
@wraps(function)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = function(*args, **kwargs)
time_taken = time.perf_counter() - start_time
print(
f"[+] {function.__name__} completed "
f"| shape = {result.shape:} | time {time_taken:.3f}s"
)
return result
return wrapper
@pipeline_logger
def parallize(df, function, n_jobs: int, *args, **kwargs) -> pd.DataFrame:
futures = [
function.remote(data, *args, **kwargs) for data in np.array_split(df, n_jobs)
]
df = pd.concat(ray.get(futures), ignore_index=True)
return df
@pipeline_logger
def start_transformation(df)-> pd.DataFrame::
# avoids working on original dataset
return df.copy()
@ray.remote
def drop_single_words(
df: pd.DataFrame, column_name: Optional[str] = "description"
) -> pd.DataFrame:
# remove rows with single words
df = df[df[column_name].str.split().str.len() > 1]
return df
@ray.remote
def fill_missing_values(
df: pd.DataFrame,
column_name: str,
missing_value_replacer: Union[str, bool, int, float],
method=None,
) -> pd.DataFrame:
# fill missing values with a replacer
df[column_name] = df[column_name].fillna(
value=missing_value_replacer, method=method
)
return df
# Usage: We assume that the data transformations pipelines src folder,
# and each transformation has unit tests in test folder
# from src.transformation import ...
# parallize your functions
NUM_CORES = 8
DATA_FOLDER = Path("./data")
RAW_MARKETING_DATA = DATA_FOLDER / "raw/estate_marketing.csv"
PROCESSED_MARKETING_DATA = DATA_FOLDER / "processed/estate_marketing.csv.gz2"
# call the helpers
ray.init(num_cpus=NUM_CORES)
# read raw data
dataf = pd.read_csv(RAW_MARKETING_DATA, parse_dates=["sale_date"])
# perform data transformations
data = (
dataf.pipe(start_transformation)
.pipe(parallize, function=drop_single_words, n_jobs=NUM_CORES)
.pipe(
parallize,
function=fill_missing_values,
column_name="energy_label",
missing_value_replacer="NA",
n_jobs=NUM_CORES,
)
)
# save compressed processed data
data.to_csv(PROCESSED_MARKETING_DATA, compression="bz2", mode="a", index=False)
# shutdown ray
ray.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment