Skip to content

Instantly share code, notes, and snippets.

@JohnStarich
Last active September 24, 2022 10:01
Show Gist options
  • Save JohnStarich/b22daa6f10f413e5f39dc75dfb7531c8 to your computer and use it in GitHub Desktop.
Save JohnStarich/b22daa6f10f413e5f39dc75dfb7531c8 to your computer and use it in GitHub Desktop.
Creates a multiprocessing.pool.ThreadPool with persistent resources in order to reduce spin up time of tasks submitted to the pool
#!/usr/bin/env python3
"""
Demonstration of creating a multiprocessing.pool.ThreadPool where
persistent resources are necessary to reduce spin up time of tasks submitted to
the pool.
The PersistentObject supports with statement contexts to show it can clean up
properly after it is told to recycle.
"""
from __future__ import print_function
from multiprocessing.pool import ThreadPool
from multiprocessing.dummy import Process as ThreadRunner
import uuid
import time
class PersistentObject(object):
def __init__(self):
self.name = uuid.uuid4()
print("Initializing persistent object %s" % self.name)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print("Cleaning up persistent object %s" % self.name)
def session_worker(inqueue, outqueue, initializer=None, initargs=(),
*args, **kwargs):
"""
TestRunner run loop.
Injects persistent object into the passed arguments.
This piece was modeled after the built-in worker implementation so it could
still be used by the ThreadPool.
"""
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()
if initializer is not None:
initializer(*initargs)
with PersistentObject() as persistent:
while 1:
try:
task = get()
except (EOFError, IOError):
print('worker got EOFError or IOError -- exiting')
break
if task is None:
print('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
task = args[0]
task_func, task_args = task
# Add persistent object to arguments
new_task_args = tuple(map(
lambda a: (a, persistent),
task_args
))
args = ((task_func, new_task_args),) + args[1::]
result = (True, func(*args, **kwds))
except Exception as e:
result = (False, e)
put((job, i, result))
class TestRunner(ThreadRunner):
"""
An executor in a ThreadPool.
This executor resets the target function to use one that injects the
persistent object into the arguments.
"""
def __init__(self, target=None, *args, **kwargs):
target = session_worker
super(TestRunner, self).__init__(target=target, *args, **kwargs)
class PersistentThreadPool(ThreadPool):
"""
A persistent object thread pool.
Injects a persistent object (for the life of the thread) into the args when
running a task submitted to the pool.
The class override for TestRunner allows the target to be overridden with
our own run loop.
"""
Process = TestRunner
def plus_one(args):
"""
Add one to the input argument and return it.
Takes at least one second to do it so other threads will run, too.
Arguments come in one positional argument as a tuple.
"""
num, persistent = args
print(persistent.name)
time.sleep(0.1)
return num + 1
if __name__ == '__main__':
pool = PersistentThreadPool(2)
result = None
try:
result = pool.map(plus_one, range(10))
except:
print("Error")
raise
print("Done")
print("Result: %s" % result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment