Skip to content

Instantly share code, notes, and snippets.

@vikramsoni2
Created April 9, 2019 23:58
Show Gist options
  • Save vikramsoni2/485f6459b0b388cd28f2c7c1e32b1e91 to your computer and use it in GitHub Desktop.
Save vikramsoni2/485f6459b0b388cd28f2c7c1e32b1e91 to your computer and use it in GitHub Desktop.
tqdm progress_apply on multiprocessing, showing parallel progress bars.
import numpy as np
import pandas as pd
from multiprocessing import cpu_count, Pool, current_process
from functools import partial
from tqdm import tqdm
def split_df_by_group(_df, entity, chunks):
df_split=[]
entities = _df[entity].unique()
for i in range(chunks): df_split.append(_df[_df[entity].isin(entities[i::chunks])].copy())
return df_split
def parallel_bars(df):
pid = int(current_process().name.split('-')[1])
tqdm.pandas(desc='worker #{}'.format(pid), position=pid)
def inner_calc(g):
for i in range(100):
g['feat_sum'] = g['feat_0'] + i
return g
return df.groupby('group_id', sort=False).progress_apply(inner_calc)
if __name__ == "__main__":
num_process = cpu_count()
data = pd.DataFrame(np.random.randint(1,10,size=(100000,2)), columns = ['feat_{}'.format(i) for i in range(2)] )
data['group_id'] = 'entity_' + np.floor(data.index / 100).astype(int).astype(str)
df_split = split_df_by_group(data, 'group_id', num_process)
p = Pool(num_process)
p.map(parallel_bars, df_split)
p.close()
p.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment