Skip to content

Instantly share code, notes, and snippets.

@cpdean
Forked from msalahi/monte_carlo.py
Last active December 19, 2015 10:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cpdean/5943545 to your computer and use it in GitHub Desktop.
Save cpdean/5943545 to your computer and use it in GitHub Desktop.
import concurrent.futures
import multiprocessing
import random
import time
from functools import partial
def fn2(fn, chunk):
return [fn(*args) for args in zip(*chunk)]
class FastProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
def map(self, fn, *iterables, timeout=None, chunksize=None):
iter_len = min(len(i) for i in iterables)
fn3 = partial(fn2, fn)
if chunksize is None:
chunksize, extra = divmod(iter_len, self._max_workers * 4)
if extra:
chunksize += 1
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn3, [iterable[ch:ch + chunksize]
for iterable in iterables])
for ch in range(0, iter_len, chunksize)]
try:
for future in fs:
if timeout is None:
for i in future.result():
yield i
else:
for i in future.result(end_time - time.time()):
yield i
finally:
for future in fs:
future.cancel()
def f(x):
return x ** 2
def patch_integrate(func, start, end, num_samples=1000):
"""uses random sampling to integrate func from 'start' to 'end'"""
with FastProcessPoolExecutor(max_workers=100) as executor:
random_samples = \
[random.uniform(start, end) for i in range(num_samples)]
total = sum(executor.map(func, random_samples))
return (end - start) * total / num_samples
def futures_plantation_integrate(func, start, end, num_samples=1000):
"""uses random sampling to integrate func from 'start' to 'end'"""
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
random_samples = \
(random.uniform(start, end) for i in range(num_samples))
total = sum(executor.map(func, random_samples))
return (end - start) * total / num_samples
def futures_integrate(func, start, end, num_samples=1000):
"""uses random sampling to integrate func from 'start' to 'end'"""
with concurrent.futures.ProcessPoolExecutor(max_workers=500) as executor:
random_samples = \
(random.uniform(start, end) for i in range(num_samples))
total = sum(executor.map(func, random_samples))
return (end - start) * total / num_samples
def baseline_integrate(func, start, end, num_samples=1000):
"""simple, single-executor approach"""
samples = (random.uniform(start, end) for i in range(num_samples))
total = sum(func(i) for i in samples)
return (end - start) * total / num_samples
# pool of processes has to be instantiated out here.
# if it's in the function, the timing stuff makes it
# gets reinstantiated faster than it can be garbage-
# collected
pool = multiprocessing.Pool(processes=10)
def processing_integrate(func, start, end, num_samples=1000):
"""uses a multiprocessing pool"""
samples = (random.uniform(start, end) for i in range(num_samples))
total = sum(pool.map(func, samples))
return (end - start) * total / num_samples
def time_it(f, times=3):
timings = []
for i in range(times):
start = time.time()
f()
stop = time.time()
timings.append(stop - start)
return sum(timings) / len(timings)
args = f, 0, 100
kwargs = {"num_samples": 10 ** 4}
print("baseline")
print(time_it(partial(baseline_integrate, *args, **kwargs)))
print("multiprocessing")
print(time_it(partial(processing_integrate, *args, **kwargs)))
print("patch future")
print(time_it(partial(patch_integrate, *args, **kwargs)))
print("threads")
print(time_it(partial(futures_plantation_integrate, *args, **kwargs)))
#print("stdlib processes")
#print(time_it(partial(futures_integrate, *args, **kwargs)))
@msalahi
Copy link

msalahi commented Jul 7, 2013

if the results this code is spitting out are accurate: http://www.brianafahey.com/hugemistake.png

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment