import queue | |
import threading | |
import traceback | |
import sys | |
def map_do(fn, iterable, n=20): | |
from concurrent.futures import ThreadPoolExecutor | |
with ThreadPoolExecutor(n) as executor: | |
results = executor.map(fn, iterable) | |
return list(results) | |
def safe(fn): | |
try: | |
fn() | |
except queue.Empty: | |
raise | |
except Exception as e: | |
traceback.print_exc() | |
def worker(done, fn, _): | |
while 1: | |
try: | |
safe(fn) | |
except queue.Empty: | |
print(f'exited: {threading.current_thread().name}', file=sys.stderr) | |
break | |
with done: | |
if done.wait(0.001): | |
print(f'exited: {threading.current_thread().name}', file=sys.stderr) | |
break | |
class Pool: | |
def __init__(self, n=10): | |
self.n = n | |
self.threads = [] | |
self.done = threading.Condition(threading.Lock()) | |
def go(self, fn): | |
th = threading.Thread( | |
target=lambda: map_do( | |
lambda _: worker(self.done, fn, _), range(self.n), n=self.n | |
), | |
daemon=True, | |
) | |
th.start() | |
self.threads.append(th) | |
def join(self): | |
with self.done: | |
self.done.notify_all() | |
for th in self.threads: | |
th.join() | |
def example(): | |
from queue import Queue | |
q = Queue() | |
list(map(lambda x: q.put(x), range(100))) | |
pool = Pool(10) | |
def one(): | |
print('got', q.get_nowait(), 'in thread', threading.current_thread().name) | |
q.task_done() | |
pool.go(one) | |
pool.go(one) | |
q.join() | |
pool.join() | |
if __name__ == '__main__': | |
example() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment