Skip to content

Instantly share code, notes, and snippets.

@cupdike
Created January 10, 2023 16:30
Show Gist options
  • Save cupdike/03d723cec6c5384b49e5c43785eb3131 to your computer and use it in GitHub Desktop.
Save cupdike/03d723cec6c5384b49e5c43785eb3131 to your computer and use it in GitHub Desktop.
Multiprocessing Pool Using Process Subclass with Custom Attributes
import multiprocessing as mp
from multiprocessing.pool import Pool
# GOAL IN CONTEXT:
# Simulate using a multiprocessing pool to download a list of files synchronously
# from a set of servers where each worker in the pool targets a specific
# download server.
# Our Worker subclasses Process so the target server can be added as an attribute.
# A CustPool subclasses Pool so our Worker subclass is used instead of Process.
# A worker (Process subclass) with a custom attribute (the server url it targets).
class Worker(mp.Process):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Assign any attributes to the worker...
self.server_url = server_urls.pop()
print(f"Custom worker constructed for server {self.server_url}.")
# A custom pool that uses our Worker sublcass.
class CustPool(Pool): # NOT mp.Pool which is a method (not a class)
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
super().__init__(processes, initializer, initargs,
maxtasksperchild, context)
# Override the Process function to return our custom Worker
@staticmethod
def Process(ctx, *args, **kwds):
return Worker(*args, **kwds)
def download_task(file_name):
p = mp.current_process()
print(f"{p.name} for {p.server_url} is downloading file {file_name}", flush=True)
# Simimulate a bit of work so the pool spins up all the workers.
# Otherwise, all the tasks gets done by on worker
import time; time.sleep(.1)
if __name__ == '__main__':
# Simulate downloading some files from a set of servers where
# each server has a dedicated worker.
nServers = 3
nFiles = 7
files_to_download = [f"File{f}" for f in range(1, 1 + nFiles)]
server_urls = [f"https://myfileserver{i}" for i in range(1, 1 + nServers)]
with CustPool(processes=nServers) as pool:
pool.map(download_task, files_to_download)
""" STDOUT:
Custom worker constructed for server https://myfileserver3.
Custom worker constructed for server https://myfileserver2.
Custom worker constructed for server https://myfileserver1.
Worker-1 for https://myfileserver3 is downloading file File1
Worker-2 for https://myfileserver2 is downloading file File2
Worker-3 for https://myfileserver1 is downloading file File3
Worker-1 for https://myfileserver3 is downloading file File4
Worker-2 for https://myfileserver2 is downloading file File5
Worker-3 for https://myfileserver1 is downloading file File6
Worker-1 for https://myfileserver3 is downloading file File7
"""
@cupdike
Copy link
Author

cupdike commented Feb 6, 2023

Works fine for me if I override Worker.run like this:

    def run(self):
        print("Hello from subclassed run()")
        return super(Worker, self).run()

Wild guess, but make sure you are launching CustomPool under a if __name__ == '__main__': block.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment