Skip to content

Instantly share code, notes, and snippets.

@bsamuel-ui
Created June 12, 2019 20:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bsamuel-ui/d5a4f8f2b9a7e36eff96803a6de7df04 to your computer and use it in GitHub Desktop.
Save bsamuel-ui/d5a4f8f2b9a7e36eff96803a6de7df04 to your computer and use it in GitHub Desktop.
An implementation of Executor.map that reads its inputs only on demand.
from functools import partial
from queue import SimpleQueue
def imap(exc, func, src, size=None, return_exc=False):
if size is None:
size = exc._max_workers
if size < 1:
raise ValueError("imap hang if size is less than one.")
results = SimpleQueue()
futures = {}
def complete(slot, fut):
try:
result = slot, fut.result(), None
except BaseException as err:
result = slot, None, err
results.put(result)
def results_get():
slot, out, err = results.get()
if err is not None:
if return_exc:
out = err
else:
raise err
return slot, out
try:
src_iter = iter(src)
free_slot = 0
while True:
if free_slot < size:
slot = free_slot
free_slot += 1
else:
slot, out = results_get()
yield out
try:
data = next(src_iter)
except StopIteration:
futures.pop(slot, None)
break
else:
futures[slot] = fut = exc.submit(func, data)
fut.add_done_callback(partial(complete, slot))
while futures:
slot, out = results_get()
futures.pop(slot, None)
yield out
finally:
for fut in futures.values():
fut.cancel()
# Testing code
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import os
from time import sleep
from threading import Lock
def printl(*args, _lock=Lock()):
with _lock:
print(*args)
def add_one(x):
printl(f'run: {x} pid={os.getpid()}')
if x % 2 == 0:
sleep(1)
return x + 1
def yields(n=25):
for x in range(n):
printl(f'get: {x} pid={os.getpid()}')
yield x
outs = set()
with ThreadPoolExecutor(7) as tpe:
for o in imap(tpe, add_one, yields(25), return_exc=True):
printl(f'got: {o} pid={os.getpid()}')
outs.add(o)
printl(f"FIN: {sorted(outs) == list(range(1, 26))}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment