Skip to content

Instantly share code, notes, and snippets.

@cupdike
Created January 10, 2023 16:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cupdike/03d723cec6c5384b49e5c43785eb3131 to your computer and use it in GitHub Desktop.
Save cupdike/03d723cec6c5384b49e5c43785eb3131 to your computer and use it in GitHub Desktop.
Multiprocessing Pool Using Process Subclass with Custom Attributes
import multiprocessing as mp
from multiprocessing.pool import Pool
# GOAL IN CONTEXT:
# Simulate using a multiprocessing pool to download a list of files synchronously
# from a set of servers where each worker in the pool targets a specific
# download server.
# Our Worker subclasses Process so the target server can be added as an attribute.
# A CustPool subclasses Pool so our Worker subclass is used instead of Process.
# A worker (Process subclass) with a custom attribute (the server url it targets).
class Worker(mp.Process):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Assign any attributes to the worker...
self.server_url = server_urls.pop()
print(f"Custom worker constructed for server {self.server_url}.")
# A custom pool that uses our Worker sublcass.
class CustPool(Pool): # NOT mp.Pool which is a method (not a class)
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
super().__init__(processes, initializer, initargs,
maxtasksperchild, context)
# Override the Process function to return our custom Worker
@staticmethod
def Process(ctx, *args, **kwds):
return Worker(*args, **kwds)
def download_task(file_name):
p = mp.current_process()
print(f"{p.name} for {p.server_url} is downloading file {file_name}", flush=True)
# Simimulate a bit of work so the pool spins up all the workers.
# Otherwise, all the tasks gets done by on worker
import time; time.sleep(.1)
if __name__ == '__main__':
# Simulate downloading some files from a set of servers where
# each server has a dedicated worker.
nServers = 3
nFiles = 7
files_to_download = [f"File{f}" for f in range(1, 1 + nFiles)]
server_urls = [f"https://myfileserver{i}" for i in range(1, 1 + nServers)]
with CustPool(processes=nServers) as pool:
pool.map(download_task, files_to_download)
""" STDOUT:
Custom worker constructed for server https://myfileserver3.
Custom worker constructed for server https://myfileserver2.
Custom worker constructed for server https://myfileserver1.
Worker-1 for https://myfileserver3 is downloading file File1
Worker-2 for https://myfileserver2 is downloading file File2
Worker-3 for https://myfileserver1 is downloading file File3
Worker-1 for https://myfileserver3 is downloading file File4
Worker-2 for https://myfileserver2 is downloading file File5
Worker-3 for https://myfileserver1 is downloading file File6
Worker-1 for https://myfileserver3 is downloading file File7
"""
@cupdike
Copy link
Author

cupdike commented Jan 10, 2023

@Litchilitchy
Copy link

Litchilitchy commented Feb 6, 2023

Thanks for the great work!

Just curious about one thing. In Worker class, if we implement(override) run method, the program will run into a infinite __init__() and run() loop.

Update: Yeah, it should be fine, I forgot to call the super().run(). Below code works well.

@cupdike
Copy link
Author

cupdike commented Feb 6, 2023

Works fine for me if I override Worker.run like this:

    def run(self):
        print("Hello from subclassed run()")
        return super(Worker, self).run()

Wild guess, but make sure you are launching CustomPool under a if __name__ == '__main__': block.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment