Skip to content

Instantly share code, notes, and snippets.

@moondef
Last active June 16, 2022 20:19
Show Gist options
  • Save moondef/00fbbae860dd770e95be31b5ba378ec6 to your computer and use it in GitHub Desktop.
Save moondef/00fbbae860dd770e95be31b5ba378ec6 to your computer and use it in GitHub Desktop.
from concurrent.futures import Future
from datetime import datetime
import queue
import threading
from time import sleep
import weakref
all_threads_queues = weakref.WeakKeyDictionary()
shutdown_happened = False
def shutdown():
global shutdown_happened
shutdown_happened = True
items = list(all_threads_queues.items())
for threads, queue in items:
queue.put(None)
threads.join()
# listens when all threads are done and stops workers
threading._register_atexit(shutdown)
class Worker():
def __init__(self, future, fn, args):
self.fn = fn
self.args = args
self.future = future
def run(self):
if not self.future.set_running_or_notify_cancel():
return
# here could be exceptions handler
result = self.fn(*self.args)
self.future.set_result(result)
def thread_target(ref, work_queue):
while True:
work_item = work_queue.get()
if work_item is not None:
work_item.run()
del work_item
executor = ref()
del executor
continue
executor = ref()
if shutdown_happened or executor is None:
work_queue.put(None)
return
del executor
class ThreadPoolExecutor():
def __init__(self, max_workers=None):
self.max_workers = max_workers
self.queue = queue.SimpleQueue()
self.threads = set()
def execute(self, fn, *args):
future = Future()
self.queue.put(Worker(future, fn, args))
num_threads = len(self.threads)
if num_threads < self.max_workers:
thread = threading.Thread(target=thread_target,
args=(weakref.ref(self),
self.queue,
))
thread.start()
self.threads.add(thread)
all_threads_queues[thread] = self.queue
return future
def map(self, fn, args):
return list(map(lambda x: self.execute(fn, x), args))
# DEMO
def longRunningTask(x):
sleep(2)
return x * 2
exec = ThreadPoolExecutor(max_workers=2)
futures = exec.map(longRunningTask, [1, 2, 3, 4])
def future_callback(f):
print("{} – {}".format(f.result(), datetime.now()))
for f in futures:
f.add_done_callback(future_callback)
@moondef
Copy link
Author

moondef commented Jun 16, 2022

execution result:

изображение

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment