Skip to content

Instantly share code, notes, and snippets.

@weaming
Created January 4, 2021 03:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save weaming/4076d68287a40e4530bdc378f7e32dd2 to your computer and use it in GitHub Desktop.
Save weaming/4076d68287a40e4530bdc378f7e32dd2 to your computer and use it in GitHub Desktop.
import queue
import threading
import traceback
import sys
def map_do(fn, iterable, n=20):
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(n) as executor:
results = executor.map(fn, iterable)
return list(results)
def safe(fn):
try:
fn()
except queue.Empty:
raise
except Exception as e:
traceback.print_exc()
def worker(done, fn, _):
while 1:
try:
safe(fn)
except queue.Empty:
print(f'exited: {threading.current_thread().name}', file=sys.stderr)
break
with done:
if done.wait(0.001):
print(f'exited: {threading.current_thread().name}', file=sys.stderr)
break
class Pool:
def __init__(self, n=10):
self.n = n
self.threads = []
self.done = threading.Condition(threading.Lock())
def go(self, fn):
th = threading.Thread(
target=lambda: map_do(
lambda _: worker(self.done, fn, _), range(self.n), n=self.n
),
daemon=True,
)
th.start()
self.threads.append(th)
def join(self):
with self.done:
self.done.notify_all()
for th in self.threads:
th.join()
def example():
from queue import Queue
q = Queue()
list(map(lambda x: q.put(x), range(100)))
pool = Pool(10)
def one():
print('got', q.get_nowait(), 'in thread', threading.current_thread().name)
q.task_done()
pool.go(one)
pool.go(one)
q.join()
pool.join()
if __name__ == '__main__':
example()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment