Skip to content

Instantly share code, notes, and snippets.

@rspeare
Last active December 19, 2019 19:28
Show Gist options
  • Save rspeare/fcc579d097547c8649bd96c399df8033 to your computer and use it in GitHub Desktop.
Save rspeare/fcc579d097547c8649bd96c399df8033 to your computer and use it in GitHub Desktop.
Cute Parallel Pandas Apply example
from joblib import Parallel, delayed
import functools
import pandas as pd
def parallel_apply(partition_col=None, n_partitions=None):
"""
This decorator wraps any transformer function that takes in a pandas dataframe as some keyword argument, "data",
and returns a pandas dataframe.
Signature must be:
f(*args, data=..., **kwargs)
Usage example:
@parallel_apply(partition_col='c', n_partitions=10)
def my_transform(mapping_table, data=None, columns=['a', 'b']):
return data.join(mapping_table.groupby(columns).sum()
Under the hood the input dataframe will be chunked and iterated through according to the chosen
partition_col, broken into n_partitions.
"""
assert partition_col is not None, 'To use this decorator, you must choose a partition column!'
assert n_partitions is not None, 'To use this decorator, you must choose some number of partitions!'
def decorator_funk(func):
"""
Simple Decorator that takes a function and returns a function
"""
# gets run during import
@functools.wraps(func)
def wrapper(*args, **kwargs):
if 'data' not in kwargs.keys():
raise ValueError("This parallel apply operation requires a data keyword argument")
kwargs['data']['_hash_partition'] = pd.util.hash_array(kwargs['data'][partition_col]) % n_partitions
return pd.concat(Parallel(n_jobs=-2, prefer='threads')(delayed(func)(*args,
data=y,
**{k:v for k,v in kwargs.items() if k!='data'}
)
for x,y in kwargs['data'].groupby('_hash_partition'))
).drop(columns=['_hash_partition'])
return wrapper
return decorator_funk
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment