Skip to content

Instantly share code, notes, and snippets.

@govorunov
Last active June 15, 2023 20:44
Show Gist options
  • Save govorunov/3d1a214dc067b7f9df54d481f46ffb68 to your computer and use it in GitHub Desktop.
Save govorunov/3d1a214dc067b7f9df54d481f46ffb68 to your computer and use it in GitHub Desktop.
Parallelize Pandas apply over multiple workers using Joblib.
from joblib import Parallel, delayed, effective_n_jobs
import pandas as pd
def gen_even_slices(n, n_packs, *, n_samples=None):
"""Generator to create n_packs slices going up to n.
Parameters
----------
n : int
n_packs : int
Number of slices to generate.
n_samples : int, default=None
Number of samples. Pass n_samples when the slices are to be used for
sparse matrix indexing; slicing off-the-end raises an exception, while
it works for NumPy arrays.
Yields
------
slice
See Also
--------
gen_batches: Generator to create slices containing batch_size elements
from 0 to n.
Examples
--------
>>> from sklearn.utils import gen_even_slices
>>> list(gen_even_slices(10, 1))
[slice(0, 10, None)]
>>> list(gen_even_slices(10, 10))
[slice(0, 1, None), slice(1, 2, None), ..., slice(9, 10, None)]
>>> list(gen_even_slices(10, 5))
[slice(0, 2, None), slice(2, 4, None), ..., slice(8, 10, None)]
>>> list(gen_even_slices(10, 3))
[slice(0, 4, None), slice(4, 7, None), slice(7, 10, None)]
"""
start = 0
if n_packs < 1:
raise ValueError("gen_even_slices got n_packs=%s, must be >=1" % n_packs)
for pack_num in range(n_packs):
this_n = n // n_packs
if pack_num < n % n_packs:
this_n += 1
if this_n > 0:
end = start + this_n
if n_samples is not None:
end = min(n_samples, end)
yield slice(start, end, None)
start = end
def parallel_apply(df, func, n_jobs=-1, **kwargs):
""" Pandas apply in parallel using joblib.
Args:
df: Pandas DataFrame, Series, or any other object that supports slicing and apply.
func: Callable to apply
n_jobs: Desired number of workers. Default value -1 means use all available cores.
**kwargs: Any additional parameters will be supplied to the apply function
Returns:
Same as for normal Pandas DataFrame.apply()
"""
if effective_n_jobs(n_jobs) == 1:
return df.apply(func, **kwargs)
else:
ret = Parallel(n_jobs=n_jobs)(
delayed(type(df).apply)(df.iloc[s], func, **kwargs)
for s in gen_even_slices(len(df), effective_n_jobs(n_jobs)))
return pd.concat(ret)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment