Skip to content

Instantly share code, notes, and snippets.

@yong27
Last active April 12, 2023 04:35
Show Gist options
  • Star 63 You must be signed in to star a gist
  • Fork 20 You must be signed in to fork a gist
  • Save yong27/7869662 to your computer and use it in GitHub Desktop.
Save yong27/7869662 to your computer and use it in GitHub Desktop.
pandas DataFrame apply multiprocessing
import multiprocessing
import pandas as pd
import numpy as np
def _apply_df(args):
df, func, kwargs = args
return df.apply(func, **kwargs)
def apply_by_multiprocessing(df, func, **kwargs):
workers = kwargs.pop('workers')
pool = multiprocessing.Pool(processes=workers)
result = pool.map(_apply_df, [(d, func, kwargs)
for d in np.array_split(df, workers)])
pool.close()
return pd.concat(list(result))
def square(x):
return x**x
if __name__ == '__main__':
df = pd.DataFrame({'a':range(10), 'b':range(10)})
apply_by_multiprocessing(df, square, axis=1, workers=4)
## run by 4 processors
@joshlk
Copy link

joshlk commented Nov 25, 2015

On large dataframes there seems to be a bug whereby the order of values for each row are lost. i.e. the values are shuffled

@smsaladi
Copy link

@joshlk It's been a while, but could you provide details about your bug? I have been able to use this snippet without any issues.

@paulochf
Copy link

I used without problems too.

@creasyw
Copy link

creasyw commented Sep 2, 2016

Thanks!! This is a great example.

@tejaslodaya
Copy link

tejaslodaya commented Feb 2, 2017

Yes, the order of the rows will be lost, because the Dataframe is appended back, as and when the sub-process completes it. To resolve this bug, we need to associate a key with each group(in the ascending order), and when they're returned, we sort them.

Refer to the modified version below-
https://gist.github.com/tejaslodaya/562a8f71dc62264a04572770375f4bba
CC: @joshlk @smsaladi

@yong27
Copy link
Author

yong27 commented Sep 9, 2017

@geekan
Copy link

geekan commented Sep 20, 2017

It's not working when trying to get data (from another big map) as new column
Seems like it need shared memory

@hkhatod
Copy link

hkhatod commented Dec 4, 2017

Cant you just use pool.join(). It should take care of the index

@bsless
Copy link

bsless commented Jun 24, 2018

Another note worth adding:
I wanted to pass an argument to the function I wanted to apply. I found the best way to do that with the suggested implementation is using functools.partial, which creates pickleable objects, so the application would look like:

apply_by_multiprocessing(df, functools.parial(my_func, some_arg), axis=1, workers=4)  

@zahrashuaib
Copy link

I'm getting this error
PicklingError: Can't pickle <function _apply_df at 0x7f2c2e876158>: attribute lookup _apply_df on main failed

@Samrat-Learner
Copy link

Thanks for the Code, have a good idea from this. But I need to send two data frames and one pandas.core.groupby.generic.DataFrameGroupBy object to my_function(). How I can achieve this through multiprocessing. if anyone can share thought / expertise, that will be great help.

@zahrashuaib
Copy link

Thanks for the Code, have a good idea from this. But I need to send two data frames and one pandas.core.groupby.generic.DataFrameGroupBy object to my_function(). How I can achieve this through multiprocessing. if anyone can share thought / expertise, that will be great help.

Check this https://github.com/zahrashuaib/parallel-computing. The dataframe sent to the function for the multiprocess.

@akhtarshahnawaz
Copy link

I wrote a package to use apply methods on Series, DataFrames and GroupByDataFrames on multiple cores. It makes it very easy to do multiprocessing in Pandas.

You can check the documentation at https://github.com/akhtarshahnawaz/multiprocesspandas

You can also install the package directly using pip

pip install multiprocesspandas

Then doing multiprocessing is as simple as importing the package as

from multiprocesspandas import applyparallel

and then using applyparallel instead of apply like

def func(x):
    import pandas as pd
    return pd.Series([x['C'].mean()])

df.groupby(["A","B"]).apply_parallel(func, num_processes=30)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment