Skip to content

Instantly share code, notes, and snippets.

@austospumanto
Last active August 31, 2020 02:36
Show Gist options
  • Save austospumanto/4a7870c464373ae5911e052bffad473b to your computer and use it in GitHub Desktop.
Save austospumanto/4a7870c464373ae5911e052bffad473b to your computer and use it in GitHub Desktop.
Pandas groupby-apply using processit
"""
System/Runtime Requirements:
>=Python3.7
Linux / Mac
>=2 CPU Cores
Must pip install to use `pd_processit`:
pickle5, tqdm, numpy, pandas
To use this file, have processit.py in same folder as pd_processit.py (this file).
processit.py: https://gist.github.com/austospumanto/6205276f84cd4dde38f3ce17dddccdb3
"""
from multiprocessing import cpu_count
from typing import Callable, Optional
import pandas as pd
from .processit import processit, PdIndexT, chunks
NCPUS = cpu_count()
DF = pd.DataFrame
S = pd.Series
I = pd.Index
def groupby_apply(df: DF, fn: Callable, njobs: int = NCPUS, desc: Optional[str] = None,) -> DF:
unique_idxs: I = df.index.get_level_values(0).unique().sort_values()
concat = True # Was an argument, but was never used
return _chunk_process_concat(
df=df,
fn=fn,
target=_groupby_apply_chunk,
map_onto=unique_idxs,
njobs=njobs,
common_kwargs=None,
axis="index",
concat=concat,
desc=desc,
)
def _chunk_process_concat(
df, fn, target, map_onto, njobs, common_kwargs, axis, concat, desc: Optional[str] = None,
):
ntodos = min(njobs, len(map_onto))
todos = [
dict(target=target, kwargs=dict(ids_chunk=ch, fn=fn, df=df))
for ch in chunks(map_onto, nchunks=ntodos)
]
assert len(todos) == ntodos, (len(todos), ntodos)
results = processit(todos, max_nprocs=ntodos, common_kwargs=common_kwargs, desc=desc)
if concat:
if axis == "columns" and isinstance(results[0], S):
return pd.concat(results, sort=False, axis="index",)
else:
return pd.concat(results, sort=False, axis=axis, ignore_index=True).set_index(
df.index.names[0], drop=True
)
else:
return results
def _groupby_apply_chunk(fn: Callable, df: DF, ids_chunk: PdIndexT):
idx_names = df.index.names
ret = df.loc[ids_chunk]
# If index is already in columns, then can drop it when reset
while ret.index.names != [None]:
drop = bool(ret.index.names[-1] in df)
ret.reset_index(level=-1, drop=drop, inplace=True)
return ret.groupby(by=idx_names, as_index=True, observed=True, sort=False).apply(fn)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment