Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Performs a Pandas groupby operation in parallel
# coding=utf-8
import pandas as pd
import itertools
import time
import multiprocessing
from typing import Callable, Tuple, Union
def groupby_parallel(groupby_df: pd.core.groupby.DataFrameGroupBy,
func: Callable[[Tuple[str, pd.DataFrame]], Union[pd.DataFrame, pd.Series]],
num_cpus: int=multiprocessing.cpu_count() - 1,
logger: Callable[[str], None]=print) -> pd.DataFrame:
"""Performs a Pandas groupby operation in parallel.
Example usage:
import pandas as pd
df = pd.DataFrame({'A': [0, 1], 'B': [100, 200]})
df.groupby(df.groupby('A'), lambda row: row['B'].sum())
Authors: Tamas Nagy and Douglas Myers-Turnbull
"""
start = time.time()
logger("\nUsing {} CPUs in parallel...".format(num_cpus))
with multiprocessing.Pool(num_cpus) as pool:
queue = multiprocessing.Manager().Queue()
result = pool.starmap_async(func, [(name, group) for name, group in groupby_df])
cycler = itertools.cycle('\|/―')
while not result.ready():
logger("Percent complete: {:.0%} {}".format(queue.qsize()/len(groupby_df), next(cycler)), end="\r")
time.sleep(0.4)
got = result.get()
logger("\nProcessed {} rows in {:.1f}s".format(len(got), time.time() - start))
return pd.concat(got)
@lukereding

This comment has been minimized.

Copy link

@lukereding lukereding commented Oct 16, 2018

Unless I'm misunderstanding something, the example in the docstring doesn't use groupby_parallel.

@Jianpeng-Xu

This comment has been minimized.

Copy link

@Jianpeng-Xu Jianpeng-Xu commented Sep 11, 2019

Can you share your idea on how to add additional arguments passing through the func? Thanks

@dmyersturnbull

This comment has been minimized.

Copy link
Owner Author

@dmyersturnbull dmyersturnbull commented Sep 16, 2019

@Jianpeng-Xu You can wrap it in another function.

def x(dfgroup, order: int):
    return dfgroup.mean() + order
groupby_parallel(df, lambda group: x(group, 3) 

Or, use functools.partial.

@Jianpeng-Xu

This comment has been minimized.

Copy link

@Jianpeng-Xu Jianpeng-Xu commented Sep 16, 2019

@Jianpeng-Xu You can wrap it in another function.

def x(dfgroup, order: int):
    return dfgroup.mean() + order
groupby_parallel(df, lambda group: x(group, 3) 

Or, use functools.partial.

Thanks for the comments. I just added a kwargs dict into the function definition to make it generic.

@ReneHamburger1993

This comment has been minimized.

Copy link

@ReneHamburger1993 ReneHamburger1993 commented Jun 20, 2020

The example does not work this way, right?
Instead of

df.groupby(df.groupby('A'), lambda row: row['B'].sum())

it should be

groupby_parallel(df.groupby('A'), lambda row: row['B'].sum())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment