Skip to content

Instantly share code, notes, and snippets.

@hgbrian
Last active February 16, 2020 01:16
Show Gist options
  • Save hgbrian/f2be82701284b3821d61ac6cc12fc958 to your computer and use it in GitHub Desktop.
Save hgbrian/f2be82701284b3821d61ac6cc12fc958 to your computer and use it in GitHub Desktop.
parallelize for loop
import pandas as pd
def expo(x, y):
return pd.DataFrame([[x, y, x**y]], columns=['x', 'y', 'x^y'])
def in_parallel(fn, loops=None, is_product=False, num_workers=8, *args, **kwargs):
from warnings import warn
from dask import compute, delayed
from itertools import product
import pandas as pd
if isinstance(loops, str):
loops = [loops]
elif loops is None:
loops = []
loop_flat = ([(loop_arg, loop_val)
for loop_val in kwargs.pop(loop_arg)]
for loop_arg in loops)
if is_product:
loop_argss = (dict(itms) for itms in product(*loop_flat))
else:
loop_argss = (dict(itms) for itms in zip(*loop_flat))
res = compute(*(delayed(fn)(*args, **kwargs, **loop_args) for loop_args in loop_argss),
num_workers=num_workers)
if all(isinstance(r, pd.DataFrame) for r in res):
return pd.concat(res).reset_index(drop=True)
elif all(isinstance(r, dict) for r in res):
if sum(len(set(r.keys())) for r in res) > len(set.union(*[set(r.keys()) for r in res])):
warn("overlapping keys, not combining dicts")
return res
return {k:v for r in res for k,v in r.items()}
elif all(isinstance(r, list) for r in res):
return [r for subr in res for r in subr]
else:
return res
res = in_parallel(expo, x=[1,2,3,4,4], y=2, loops='x')
print(res)
res = in_parallel(expo, x=[1,2,3,4,5], y=[1,2,3], loops=['x', 'y'])
print(res)
res = in_parallel(expo, x=[1,2,3,4,5], y=[1,2,3], loops=['x', 'y'], is_product=True)
print(res)
"""
x y x^y
0 1 2 1
1 2 2 4
2 3 2 9
3 4 2 16
4 4 2 16
x y x^y
0 1 1 1
1 2 2 4
2 3 3 27
x y x^y
0 1 1 1
1 1 2 1
2 1 3 1
3 2 1 2
4 2 2 4
5 2 3 8
6 3 1 3
7 3 2 9
8 3 3 27
9 4 1 4
10 4 2 16
11 4 3 64
12 5 1 5
13 5 2 25
14 5 3 125"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment