Skip to content

Instantly share code, notes, and snippets.

@snakers4
Created December 14, 2018 12:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save snakers4/24cf04786224e22325f109ed15a6ac59 to your computer and use it in GitHub Desktop.
Save snakers4/24cf04786224e22325f109ed15a6ac59 to your computer and use it in GitHub Desktop.
Pandas multiprocessing wrappers
from tqdm import tqdm
import numpy as np
import pandas as pd
from multiprocessing import Pool
def _apply_df(args):
df, func, num, kwargs = args
return num, df.apply(func, **kwargs)
def apply_by_multiprocessing(df,func,**kwargs):
workers = kwargs.pop('workers')
chunks = kwargs.pop('chunks')
with Pool(workers) as p:
apply_lst = [(d, func, i, kwargs) for i,d in enumerate(np.array_split(df, chunks))]
result = list(tqdm(p.imap(_apply_df, apply_lst), total=len(apply_lst)))
result=sorted(result,key=lambda x:x[0])
return pd.concat([i[1] for i in result], sort = False)
def _apply_df_groupby(args):
group, func, name, kwargs = args
return name, func(group, **kwargs)
def multiprocessing_groupby(groupby,
func,
**kwargs):
workers = kwargs.pop('workers')
with Pool(workers) as p:
apply_lst = [(group, func, name, kwargs) for name,group in groupby]
result = list(tqdm(p.imap(_apply_df_groupby, apply_lst), total=len(apply_lst)))
result=sorted(result,key=lambda x:x[0])
return pd.concat([i[1] for i in result], sort = False)
def list_multiprocessing(param_lst,
func,
**kwargs):
workers = kwargs.pop('workers')
with Pool(workers) as p:
apply_lst = [([params], func, i, kwargs) for i,params in enumerate(param_lst)]
result = list(tqdm(p.imap(_apply_lst, apply_lst), total=len(apply_lst)))
# lists do not need such sorting, but this can be useful later
result=sorted(result,key=lambda x:x[0])
return [_[1] for _ in result]
def list_multiprocessing_nopbar(param_lst,
func,
**kwargs):
workers = kwargs.pop('workers')
with Pool(workers) as p:
apply_lst = [([params], func, i, kwargs) for i,params in enumerate(param_lst)]
result = list(p.imap(_apply_lst, apply_lst))
# lists do not need such sorting, but this can be useful later
result=sorted(result,key=lambda x:x[0])
return [_[1] for _ in result]
def _apply_lst(args):
params, func, num, kwargs = args
return num, func(*params,**kwargs)
def dict_multiprocessing(param_dict,
func,
**kwargs):
workers = kwargs.pop('workers')
with Pool(workers) as p:
apply_lst = [([params], func, i, keys, kwargs) for i,(keys,params) in enumerate(dct.items())]
result = list(tqdm(p.imap(_apply_dct, apply_lst), total=len(apply_lst)))
# dicts do not need such sorting, but this can be useful later
result=sorted(result,key=lambda x:x[0])
return dict(zip([_[2] for _ in result], [_[1] for _ in result]))
def _apply_dct(args):
params, func, num, key, kwargs = args
return num, func(*params,**kwargs), key
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment