Performs a Pandas groupby operation in parallel
# coding=utf-8 | |
import pandas as pd | |
import itertools | |
import time | |
import multiprocessing | |
from typing import Callable, Tuple, Union | |
def groupby_parallel(groupby_df: pd.core.groupby.DataFrameGroupBy, | |
func: Callable[[Tuple[str, pd.DataFrame]], Union[pd.DataFrame, pd.Series]], | |
num_cpus: int=multiprocessing.cpu_count() - 1, | |
logger: Callable[[str], None]=print) -> pd.DataFrame: | |
"""Performs a Pandas groupby operation in parallel. | |
Example usage: | |
import pandas as pd | |
df = pd.DataFrame({'A': [0, 1], 'B': [100, 200]}) | |
df.groupby(df.groupby('A'), lambda row: row['B'].sum()) | |
Authors: Tamas Nagy and Douglas Myers-Turnbull | |
""" | |
start = time.time() | |
logger("\nUsing {} CPUs in parallel...".format(num_cpus)) | |
with multiprocessing.Pool(num_cpus) as pool: | |
queue = multiprocessing.Manager().Queue() | |
result = pool.starmap_async(func, [(name, group) for name, group in groupby_df]) | |
cycler = itertools.cycle('\|/―') | |
while not result.ready(): | |
logger("Percent complete: {:.0%} {}".format(queue.qsize()/len(groupby_df), next(cycler)), end="\r") | |
time.sleep(0.4) | |
got = result.get() | |
logger("\nProcessed {} rows in {:.1f}s".format(len(got), time.time() - start)) | |
return pd.concat(got) |
This comment has been minimized.
This comment has been minimized.
Can you share your idea on how to add additional arguments passing through the func? Thanks |
This comment has been minimized.
This comment has been minimized.
@Jianpeng-Xu You can wrap it in another function. def x(dfgroup, order: int):
return dfgroup.mean() + order
groupby_parallel(df, lambda group: x(group, 3) Or, use |
This comment has been minimized.
This comment has been minimized.
Thanks for the comments. I just added a kwargs dict into the function definition to make it generic. |
This comment has been minimized.
This comment has been minimized.
The example does not work this way, right? df.groupby(df.groupby('A'), lambda row: row['B'].sum()) it should be groupby_parallel(df.groupby('A'), lambda row: row['B'].sum()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
Unless I'm misunderstanding something, the example in the docstring doesn't use
groupby_parallel
.