Skip to content

Instantly share code, notes, and snippets.

@tag1216
Last active March 9, 2021 18:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tag1216/f5daedee54c0ba93fbfef7243fcf6f3b to your computer and use it in GitHub Desktop.
Save tag1216/f5daedee54c0ba93fbfef7243fcf6f3b to your computer and use it in GitHub Desktop.
Python concurrent.futures pipe
import queue
import random
import time
from concurrent.futures import ThreadPoolExecutor, Executor, Future
from contextlib import contextmanager
from threading import Thread
from tqdm import tqdm
class MyExecutor(ThreadPoolExecutor):
def __init__(self, max_workers, queue_size):
super().__init__(max_workers)
self._work_queue = queue.Queue(maxsize=queue_size)
def proc1(args):
i, n1, n2, = args
# print(f'proc1[{i}] {n1},{n2} start')
time.sleep(n1)
# print(f'proc1[{i}] {n1},{n2} end')
return i, n1, n2,
def proc2(args):
i, n1, n2, = args
# print(f'proc2[{i}] {n1},{n2} start')
time.sleep(n2)
# print(f'proc2[{i}] {n1},{n2} end')
return i, n1, n2
def submit_all(executor: Executor, fn, iterable):
result_queue = queue.Queue()
def put_result(*args, **kwargs):
result = fn(*args, **kwargs)
result_queue.put(result)
finished = False
def submit():
for n in iterable:
executor.submit(put_result, n)
nonlocal finished
finished = True
thread = Thread(target=submit).start()
while not finished:
yield result_queue.get()
def submit_ordered(executor: Executor, fn, iterable):
for n in iterable:
future = executor.submit(fn, n)
yield future.result()
@contextmanager
def timer():
start = time.time()
yield
end = time.time()
print(end - start)
def tqdm(itr, *args, **kwargs):
# for value in itr:
import tqdm
for value in tqdm.tqdm(itr, *args, **kwargs):
# print(kwargs['desc'], value)
yield value
def main():
total = 10
data = [
(i,
random.randint(1, 5)/10,
random.randint(1, 10)/10,)
for i in range(total)
]
executor1 = MyExecutor(2, 1)
executor2 = MyExecutor(2, 5)
with executor1, executor2:
with timer():
itr0 = tqdm(data, total=total, position=0, leave=False, desc='data')
results1 = executor1.map(proc1, itr0)
itr1 = tqdm(results1, total=total, position=1, leave=False, desc='proc1')
results2 = executor2.map(proc2, itr1)
itr2 = tqdm(results2, total=total, position=2, leave=False, desc='proc2')
print(list(itr2))
with timer():
itr0 = tqdm(data, total=total, position=0, leave=False, desc='data')
results1 = submit_all(executor1, proc1, itr0)
itr1 = tqdm(results1, total=total, position=1, leave=False, desc='proc1')
results2 = submit_all(executor2, proc2, itr1)
itr2 = tqdm(results2, total=total, position=2, desc='proc2')
print(list(itr2))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment