Last active
December 27, 2017 16:11
-
-
Save jomido/b3bda84be7695811c40e6501c491f05d to your computer and use it in GitHub Desktop.
asyncio pools (threaded and subprocessed)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
from pools import threads, processes | |
in_thread = threads(10) | |
in_process = processes(4) | |
def fib(n): | |
if n == 0: | |
return 0 | |
elif n == 1: | |
return 1 | |
else: | |
return fib(n-1)+fib(n-2) | |
def compute(n): | |
return fib(n) | |
@in_thread | |
def threaded_compute(n): | |
""" | |
This should be the slowest. Use @in_thread for blocking IO (i.e. requests lib). | |
""" | |
return fib(n) | |
@in_process | |
def parallel_compute(n): | |
return fib(n) | |
async def main(): | |
n = 25 | |
start = time.perf_counter() | |
results = [compute(n) for i in range(20)] | |
duration = time.perf_counter() - start | |
print("{} seconds for local compute.".format(duration)) | |
start = time.perf_counter() | |
results = await asyncio.gather(*[threaded_compute(n) for i in range(20)]) | |
duration = time.perf_counter() - start | |
print("{} seconds for threaded compute.".format(duration)) | |
start = time.perf_counter() | |
results = await asyncio.gather(*[parallel_compute(n) for i in range(20)]) | |
duration = time.perf_counter() - start | |
print("{} seconds for subprocessed compute.".format(duration)) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
future = asyncio.ensure_future(main()) | |
loop.run_until_complete(future) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
from functools import wraps, partial | |
import dill | |
def serialized(f, *args, **kwargs): | |
f = dill.loads(f) | |
args = map(dill.loads, args) | |
kwargs = {k: dill.loads(v) for k, v in kwargs.items()} | |
result = f(*args, **kwargs) | |
return dill.dumps(result) | |
def serialize(f, *args, **kwargs): | |
f = dill.dumps(f) | |
args = map(dill.dumps, args) | |
kwargs = {k: dill.dumps(v) for k, v in kwargs.items()} | |
return partial( | |
serialized, | |
f, | |
*args, | |
**kwargs | |
) | |
def threads(max_workers=1): | |
def decorator(f): | |
pool = ThreadPoolExecutor(max_workers=max_workers) | |
@wraps(f) | |
async def wrapper(*args, **kwargs): | |
future = pool.submit(f, *args, **kwargs) | |
future = asyncio.wrap_future(future) | |
result = await future | |
return result | |
return wrapper | |
return decorator | |
def processes(max_workers=1): | |
def decorator(f): | |
pool = ProcessPoolExecutor(max_workers=max_workers) | |
@wraps(f) | |
async def wrapper(*args, **kwargs): | |
fn = serialize(f, *args, **kwargs) | |
future = pool.submit(fn) | |
future = asyncio.wrap_future(future) | |
result = await future | |
return dill.loads(result) | |
return wrapper | |
return decorator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment