Skip to content

Instantly share code, notes, and snippets.

@andreycizov
Created April 4, 2016 13:53
Show Gist options
  • Save andreycizov/386106c96e742e67c7e1709bb7adab8b to your computer and use it in GitHub Desktop.
Save andreycizov/386106c96e742e67c7e1709bb7adab8b to your computer and use it in GitHub Desktop.
concurrent.futures ThreadProcessPoolExecutor
import multiprocessing
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import _process_worker as _old_process_worker
from threading import Thread
_thread_worker = _old_process_worker
def _process_worker(max_threads, call_queue, result_queue):
threads = []
for i in range(max_threads):
t = Thread(target=_thread_worker, args=(call_queue, result_queue))
t.daemon = True
t.start()
threads.append(t)
for t in threads:
t.join()
class ThreadProcessPoolExecutor(ProcessPoolExecutor):
def __init__(self, max_threads=None, max_workers=None):
if max_threads is None:
self._max_threads = os.cpu_count() or 1
else:
if max_threads <= 0:
raise ValueError("max_threads must be greater than 0")
self._max_threads = max_threads
super().__init__(max_workers)
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._max_threads,
self._call_queue,
self._result_queue))
p.start()
self._processes[p.pid] = p
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment