Skip to content

Instantly share code, notes, and snippets.

@g-clef
Created June 2, 2020 18:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save g-clef/ec7a4bcb94223821331df5aaf3cfc7bd to your computer and use it in GitHub Desktop.
Save g-clef/ec7a4bcb94223821331df5aaf3cfc7bd to your computer and use it in GitHub Desktop.
Using Python's Multiprocessing Managers to share a queue between hosts

To share a queue between hosts with Python, you can use a "SyncManager". There'll be one machine that's the manager/owner of the queue, and then multiple other machines can connect to that queue to pull jobs. You can also share multiprocessing Events (for example a "shutdown" Event to tell all the workers on the queue that they should stop).

First, make an empty class that's a subclass of the multiprocessing.managers.SyncManager:

class CentralManager(multiprocessing.managers.SyncManager):
    """
CentralManager:
A SyncManager class. This synchronizes a shared object across multiple
hosts. This is used in our case to offer shared input & output queues.
"""
    pass

Next, on the manager box, you'll need a function like:

def make_central_manager(port, address, authkey):
    """
make_central_manager(port, address, authkey):
Creates the shared queues, makes a SyncManager to offer them to
other hosts, and starts the manager. Returns an instance of
CentralManager.
port - the port for the manager to listen on
address - the IP address for the manager to listen on
authkey - the key that workers must provide to connect to this manager.
"""
    port = int(port)
    queue = multiprocessing.Queue()
    manager = CentralManager(address=(address, port), authkey=authkey)
    manager.register("get_queue", callable=lambda: inputqueue)
    manager.daemon = True
    manager.start()
    return manager

This creates an instance of the manager, adds authentication to the manager with the authkey, registers a function to it, get_queue that returns the shared queue, and starts the manager.

On each of the clients, you'll then have a function like:

def make_client_manager(port, address, auth_key):
    """
get_client_manager(port, address, authkey):
Connects to a remote Sync manager to get our queues.
port - the port for the manager to listen on
address - the IP address for the manager to listen on
authkey - the key that workers must provide to connect to this manager.
"""
    port = int(port)
    manager = CentralManager(address=(address, port), authkey=auth_key)
    manager.register("get_queue")
    manager.connect()
    return manager

Which will return a client instance of the manager. The clients will then call manager.get_queue(), which will return an instance of the queue for them to use as if it were a normal multiprocessing queue.

Lastly, as your program finishes, you will also want to call: manager.shutdown() on the central manager.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment