Skip to content

Instantly share code, notes, and snippets.

@jomido
Last active December 27, 2017 16:11
Show Gist options
  • Save jomido/b3bda84be7695811c40e6501c491f05d to your computer and use it in GitHub Desktop.
Save jomido/b3bda84be7695811c40e6501c491f05d to your computer and use it in GitHub Desktop.
asyncio pools (threaded and subprocessed)
import asyncio
import time
from pools import threads, processes
in_thread = threads(10)
in_process = processes(4)
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1)+fib(n-2)
def compute(n):
return fib(n)
@in_thread
def threaded_compute(n):
"""
This should be the slowest. Use @in_thread for blocking IO (i.e. requests lib).
"""
return fib(n)
@in_process
def parallel_compute(n):
return fib(n)
async def main():
n = 25
start = time.perf_counter()
results = [compute(n) for i in range(20)]
duration = time.perf_counter() - start
print("{} seconds for local compute.".format(duration))
start = time.perf_counter()
results = await asyncio.gather(*[threaded_compute(n) for i in range(20)])
duration = time.perf_counter() - start
print("{} seconds for threaded compute.".format(duration))
start = time.perf_counter()
results = await asyncio.gather(*[parallel_compute(n) for i in range(20)])
duration = time.perf_counter() - start
print("{} seconds for subprocessed compute.".format(duration))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import wraps, partial
import dill
def serialized(f, *args, **kwargs):
f = dill.loads(f)
args = map(dill.loads, args)
kwargs = {k: dill.loads(v) for k, v in kwargs.items()}
result = f(*args, **kwargs)
return dill.dumps(result)
def serialize(f, *args, **kwargs):
f = dill.dumps(f)
args = map(dill.dumps, args)
kwargs = {k: dill.dumps(v) for k, v in kwargs.items()}
return partial(
serialized,
f,
*args,
**kwargs
)
def threads(max_workers=1):
def decorator(f):
pool = ThreadPoolExecutor(max_workers=max_workers)
@wraps(f)
async def wrapper(*args, **kwargs):
future = pool.submit(f, *args, **kwargs)
future = asyncio.wrap_future(future)
result = await future
return result
return wrapper
return decorator
def processes(max_workers=1):
def decorator(f):
pool = ProcessPoolExecutor(max_workers=max_workers)
@wraps(f)
async def wrapper(*args, **kwargs):
fn = serialize(f, *args, **kwargs)
future = pool.submit(fn)
future = asyncio.wrap_future(future)
result = await future
return dill.loads(result)
return wrapper
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment