Skip to content

Instantly share code, notes, and snippets.

@mosquito
Created August 31, 2017 14:14
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mosquito/4dd5439949cb26114a97ab5817ca1c2c to your computer and use it in GitHub Desktop.
Save mosquito/4dd5439949cb26114a97ab5817ca1c2c to your computer and use it in GitHub Desktop.
import logging
from concurrent.futures import Executor
from functools import partial
from multiprocessing.pool import ThreadPool as ThreadPoolBase
log = logging.getLogger()
class ThreadPool(ThreadPoolBase, Executor):
def __init__(self, *args, loop, **kwargs):
super().__init__(*args, **kwargs)
self.__loop = loop
def __on_result(self, future, result):
self.loop.call_soon_threadsafe(future.set_result, result)
def __on_exception(self, future, exc):
if self.loop.is_closed():
log.exception(
'Event loop is closed but exception was raised from the thread',
exc_info=exc
)
return
if isinstance(exc, StopIteration):
try:
raise StopAsyncIteration(*exc.args) from exc
except StopAsyncIteration as e:
exc = e
self.loop.call_soon_threadsafe(future.set_exception, exc)
@property
def loop(self):
return self.__loop
def submit(self, func, *args, **kwargs):
result_future = self.loop.create_future()
self.apply_async(
func=func,
args=args,
kwds=kwargs,
callback=partial(self.__on_result, result_future),
error_callback=partial(self.__on_exception, result_future),
)
return result_future
def shutdown(self, wait=True):
self.close()
if wait:
self.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment