Skip to content

Instantly share code, notes, and snippets.

@internetimagery
Created July 4, 2022 09:41
Show Gist options
  • Save internetimagery/2bfafab9562559b4ed5aff00f0e3335e to your computer and use it in GitHub Desktop.
Save internetimagery/2bfafab9562559b4ed5aff00f0e3335e to your computer and use it in GitHub Desktop.
Simple job submitter for multiprocessing.SyncManager
from __future__ import print_function
from threading import Thread
from pickle import dumps, loads
from six.moves.queue import Empty
from itertools import chain, count
from multiprocessing.util import Finalize
from concurrent.futures import Future, ProcessPoolExecutor
def register_submit(manager_cls, timeout=10, id_generator=count(), future_store={}):
# Server Side
def process_tasks(queues, timeout):
submit_queue, result_queue = loads(queues)
def read(timeout=None):
while True:
try:
item = submit_queue.get(block=not timeout, timeout=timeout)
except Empty:
break
if item is None:
break
yield item
def chunk():
try:
# Block until some work is ready, then kick off
# some process pools to perform work.
# If no work comes in within timeout, then release pools
# and wait for more work.
for item in read():
with ProcessPoolExecutor() as pool:
for result in pool.map(_work, chain((item,), read(timeout))):
result_queue.put(result)
finally:
result_queue.put_nowait(None)
th = Thread(target=chunk)
th.start()
manager_cls.register("_process_tasks", process_tasks)
# Client Side
def submit(self, func, *args, **kwargs):
if not hasattr(self, "_submit_queue"):
self._submit_queue = submit_queue = self.Queue()
self._result_queue = self.Queue()
self.submit_shutdown = Finalize(self, lambda *_: submit_queue.put_nowait(None))
# Read in futures and process results
def collect_results():
for id_, result, err in iter(self._result_queue.get, None):
fut = future_store.pop(id_)
if err:
fut.set_exception(err)
else:
fut.set_result(result)
th = Thread(target=collect_results)
th.start()
self._process_tasks(dumps((self._submit_queue, self._result_queue)), timeout)
# Submit tasks to be processed
id_ = next(id_generator)
fut = future_store[id_] = Future()
self._submit_queue.put((id_, func, args, kwargs))
return fut
manager_cls.submit = submit
def _work(item):
# Run inside process pool
id_, func, args, kwargs = item
err = None
try:
result = func(*args, **kwargs)
except Exception as err:
result = None
return id_, result, err
# DEMO ########
from multiprocessing.managers import SyncManager
register_submit(SyncManager)
if __name__ == "__main__":
manager = SyncManager()
manager.start()
fut = manager.submit(str, 123)
print(fut, fut.result())
manager.submit_shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment