Skip to content

Instantly share code, notes, and snippets.

@dmyersturnbull
Last active February 6, 2024 00:43
Show Gist options
  • Save dmyersturnbull/dff552ad719943b1362d005eb86744ec to your computer and use it in GitHub Desktop.
Save dmyersturnbull/dff552ad719943b1362d005eb86744ec to your computer and use it in GitHub Desktop.
Performs a Pandas groupby operation in parallel
import pandas as pd
import itertools
import time
import multiprocessing
from typing import Callable, Tuple, Union
def groupby_parallel(
groupby_df: pd.core.groupby.DataFrameGroupBy,
func: Callable[[Tuple[str, pd.DataFrame]], Union[pd.DataFrame, pd.Series]],
num_cpus: int = multiprocessing.cpu_count() - 1,
log_fn: Callable[[str], Any] = print,
wait_sec: float = 0.4,
) -> pd.DataFrame:
"""
Performs a Pandas groupby operation in parallel.
Authors: Tamas Nagy and Douglas Myers-Turnbull
Example:
import pandas as pd
df = pd.DataFrame({"A": [0, 1], "B": [100, 200]})
groupby_parallel(df.groupby("A"), lambda row: row["B"].sum())
"""
start = time.time()
log_fn(f"\nUsing {num_cpus} CPUs in parallel...")
with multiprocessing.Pool(num_cpus) as pool:
queue = multiprocessing.Manager().Queue()
result = pool.starmap_async(func, [(name, group) for name, group in groupby_df])
cycler = itertools.cycle("\|/―")
while not result.ready():
log_fn(f"Percent complete: {queue.qsize()/len(groupby_df):.0%} {next(cycler)}"), end="\r")
time.sleep(wait_sec)
got = result.get()
log_fn(f"\nProcessed {len(got)} rows in {time.time() - start:.1f}s")
return pd.concat(got)
@lukereding
Copy link

Unless I'm misunderstanding something, the example in the docstring doesn't use groupby_parallel.

@Jianpeng-Xu
Copy link

Can you share your idea on how to add additional arguments passing through the func? Thanks

@dmyersturnbull
Copy link
Author

@Jianpeng-Xu You can wrap it in another function.

def x(dfgroup, order: int):
    return dfgroup.mean() + order
groupby_parallel(df, lambda group: x(group, 3) 

Or, use functools.partial.

@Jianpeng-Xu
Copy link

@Jianpeng-Xu You can wrap it in another function.

def x(dfgroup, order: int):
    return dfgroup.mean() + order
groupby_parallel(df, lambda group: x(group, 3) 

Or, use functools.partial.

Thanks for the comments. I just added a kwargs dict into the function definition to make it generic.

@ReneHamburger1993
Copy link

The example does not work this way, right?
Instead of

df.groupby(df.groupby('A'), lambda row: row['B'].sum())

it should be

groupby_parallel(df.groupby('A'), lambda row: row['B'].sum())

@dmyersturnbull
Copy link
Author

The example does not work this way, right? Instead of

df.groupby(df.groupby('A'), lambda row: row['B'].sum())

it should be

groupby_parallel(df.groupby('A'), lambda row: row['B'].sum())

@ReneHamburger1993 Fixed. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment