Last active
September 24, 2022 10:01
-
-
Save JohnStarich/b22daa6f10f413e5f39dc75dfb7531c8 to your computer and use it in GitHub Desktop.
Creates a multiprocessing.pool.ThreadPool with persistent resources in order to reduce spin up time of tasks submitted to the pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Demonstration of creating a multiprocessing.pool.ThreadPool where | |
persistent resources are necessary to reduce spin up time of tasks submitted to | |
the pool. | |
The PersistentObject supports with statement contexts to show it can clean up | |
properly after it is told to recycle. | |
""" | |
from __future__ import print_function | |
from multiprocessing.pool import ThreadPool | |
from multiprocessing.dummy import Process as ThreadRunner | |
import uuid | |
import time | |
class PersistentObject(object): | |
def __init__(self): | |
self.name = uuid.uuid4() | |
print("Initializing persistent object %s" % self.name) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
print("Cleaning up persistent object %s" % self.name) | |
def session_worker(inqueue, outqueue, initializer=None, initargs=(), | |
*args, **kwargs): | |
""" | |
TestRunner run loop. | |
Injects persistent object into the passed arguments. | |
This piece was modeled after the built-in worker implementation so it could | |
still be used by the ThreadPool. | |
""" | |
put = outqueue.put | |
get = inqueue.get | |
if hasattr(inqueue, '_writer'): | |
inqueue._writer.close() | |
outqueue._reader.close() | |
if initializer is not None: | |
initializer(*initargs) | |
with PersistentObject() as persistent: | |
while 1: | |
try: | |
task = get() | |
except (EOFError, IOError): | |
print('worker got EOFError or IOError -- exiting') | |
break | |
if task is None: | |
print('worker got sentinel -- exiting') | |
break | |
job, i, func, args, kwds = task | |
try: | |
task = args[0] | |
task_func, task_args = task | |
# Add persistent object to arguments | |
new_task_args = tuple(map( | |
lambda a: (a, persistent), | |
task_args | |
)) | |
args = ((task_func, new_task_args),) + args[1::] | |
result = (True, func(*args, **kwds)) | |
except Exception as e: | |
result = (False, e) | |
put((job, i, result)) | |
class TestRunner(ThreadRunner): | |
""" | |
An executor in a ThreadPool. | |
This executor resets the target function to use one that injects the | |
persistent object into the arguments. | |
""" | |
def __init__(self, target=None, *args, **kwargs): | |
target = session_worker | |
super(TestRunner, self).__init__(target=target, *args, **kwargs) | |
class PersistentThreadPool(ThreadPool): | |
""" | |
A persistent object thread pool. | |
Injects a persistent object (for the life of the thread) into the args when | |
running a task submitted to the pool. | |
The class override for TestRunner allows the target to be overridden with | |
our own run loop. | |
""" | |
Process = TestRunner | |
def plus_one(args): | |
""" | |
Add one to the input argument and return it. | |
Takes at least one second to do it so other threads will run, too. | |
Arguments come in one positional argument as a tuple. | |
""" | |
num, persistent = args | |
print(persistent.name) | |
time.sleep(0.1) | |
return num + 1 | |
if __name__ == '__main__': | |
pool = PersistentThreadPool(2) | |
result = None | |
try: | |
result = pool.map(plus_one, range(10)) | |
except: | |
print("Error") | |
raise | |
print("Done") | |
print("Result: %s" % result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment