Skip to content

Instantly share code, notes, and snippets.

@minstrel271
Last active March 11, 2016 08:37
Show Gist options
  • Save minstrel271/46ca59577b872f767e55 to your computer and use it in GitHub Desktop.
Save minstrel271/46ca59577b872f767e55 to your computer and use it in GitHub Desktop.
from concurrent.futures import as_completed
import warnings
def async_map(executor, func, *iterables):
"""Execute the given function asynchronously and yield the args-result pairs.
Args:
executor (Executor): An opened concurrent.future.Executor.
func (function(*args)): Function to be executed.
*iterables (Iterables): Parameters to be applied on func.
>>> from concurrent.futures import ThreadPoolExecutor as Executor
>>> from math import sqrt, floor
>>> def is_prime(n):
... return all(n % i for i in range(2, floor(sqrt(n))+1))
>>> with Executor(max_workers=4) as executor:
... l = list(async_map(executor, is_prime, (2, 4)))
>>> l.sort()
>>> l
[(2, True), (4, False)]
"""
if len(iterables) > 1:
future_to_args = {executor.submit(func, *args): args
for args in zip(*iterables)}
else:
future_to_args = {executor.submit(func, arg): arg
for arg in iterables[0]}
completed = as_completed(future_to_args.keys())
for future in completed:
args = future_to_args[future]
if future.exception() is None:
yield args, future.result()
else:
message = '{} in {}{}: {}'.format(
future.exception().__class__.__name__,
func.__name__, args, future.exception())
warnings.warn(message, RuntimeWarning)
if __name__ == '__main__':
import doctest
doctest.testmod()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment