Skip to content

Instantly share code, notes, and snippets.

@msalahi
Created July 7, 2013 18:23
Show Gist options
  • Save msalahi/5944430 to your computer and use it in GitHub Desktop.
Save msalahi/5944430 to your computer and use it in GitHub Desktop.
performance fix for ProcessPoolExecutor
import concurrent.futures
import multiprocessing
import random
from fast import FastProcessPoolExecutor
import time
from functools import partial
def fn2(fn, chunk):
return [fn(*args) for args in zip(*chunk)]
class FastProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
def map(self, fn, *iterables, timeout=None, chunksize=None):
iter_len = min(len(i) for i in iterables)
fn3 = partial(fn2, fn)
if chunksize is None:
chunksize, extra = divmod(iter_len, self._max_workers * 4)
if extra:
chunksize += 1
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn3, [iterable[ch:ch+chunksize]
for iterable in iterables])
for ch in range(0, iter_len, chunksize)]
try:
for future in fs:
if timeout is None:
for i in future.result():
yield i
else:
for i in future.result(end_time - time.time()):
yield i
finally:
for future in fs:
future.cancel()
def f(x):
return x ** 2
def futures_integrate(func, start, end, num_samples=1000):
"""uses random sampling to integrate func from 'start' to 'end'"""
with FastProcessPoolExecutor() as executor:
random_samples = \
[random.uniform(start, end) for i in range(num_samples)]
total = sum(executor.map(func, random_samples))
return (end - start) * total / num_samples
def baseline_integrate(func, start, end, num_samples=1000):
"""simple, single-executor approach"""
samples = (random.uniform(start, end) for i in range(num_samples))
total = sum(func(i) for i in samples)
return (end - start) * total / num_samples
# pool of processes has to be instantiated out here.
# if it's in the function, the timing stuff makes it
# gets reinstantiated faster than it can be garbage-
# collected
pool = multiprocessing.Pool(processes=20)
def processing_integrate(func, start, end, num_samples=1000):
"""uses a multiprocessing pool"""
samples = (random.uniform(start, end) for i in range(num_samples))
total = sum(pool.map(func, samples))
return (end - start) * total / num_samples
def time_it(f, times=10):
timings = []
for i in range(times):
start = time.time()
f()
stop = time.time()
timings.append(stop - start)
return sum(timings) / len(timings)
args = f, 0, 100
kwargs = {"num_samples": 10 ** 4}
print("baseline")
print(time_it(partial(baseline_integrate, *args, **kwargs)))
print("multiprocessing")
print(time_it(partial(processing_integrate, *args, **kwargs)))
print("futures")
print(time_it(partial(futures_integrate, *args, **kwargs)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment