Skip to content

Instantly share code, notes, and snippets.

@edraizen
Last active June 24, 2018 19:28
Show Gist options
  • Save edraizen/92391407f5301b15f179865cf74f07a2 to your computer and use it in GitHub Desktop.
Save edraizen/92391407f5301b15f179865cf74f07a2 to your computer and use it in GitHub Desktop.
import os
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get
num_workers = 20 #or use int(os.environ["SLURM_JOB_CPUS_PER_NODE"])
def apply_row(row):
return pd.Series({
"newColA": row["A"]+row["B"]),
"newColB": row["A"]+row["C"])})
#Read in pandas
pandas_df = pd.read_hdf("file.h5", "table")
#Convert to dask and split into chunks
#Make sure to set name so it doesn't hash the pandas_df
ddf = dd.from_pandas(pandas_df, name="pandas_test", npartitions=num_workers)
#Define the column types of your output
meta = pd.DataFrame({"newColA":[str], "newColB":[str]})
#Apply function in parallel using the number of cores specified (num_workers)
#The compute get uses multiprocessing.Pool like:
# pool = multiprocessing.Pool(num_workers)
new_series = ddf.map_partitions(lambda _df: _df.apply(apply_row, axis=1), meta=meta).compute(get=get, num_workers=num_workers)
#Create new columns in pandas df
pandas_df.loc[:, ["newColA", "newColB"]] = new_series
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment