To share a queue between hosts with Python, you can use a "SyncManager". There'll be one machine that's the manager/owner of the queue, and then multiple other machines can connect to that queue to pull jobs. You can also share multiprocessing Events (for example a "shutdown" Event to tell all the workers on the queue that they should stop).
First, make an empty class that's a subclass of the multiprocessing.managers.SyncManager:
class CentralManager(multiprocessing.managers.SyncManager):
"""
CentralManager:
A SyncManager class. This synchronizes a shared object across multiple
hosts. This is used in our case to offer shared input & output queues.
"""
pass
Next, on the manager box, you'll need a function like:
def make_central_manager(port, address, authkey):
"""
make_central_manager(port, address, authkey):
Creates the shared queues, makes a SyncManager to offer them to
other hosts, and starts the manager. Returns an instance of
CentralManager.
port - the port for the manager to listen on
address - the IP address for the manager to listen on
authkey - the key that workers must provide to connect to this manager.
"""
port = int(port)
queue = multiprocessing.Queue()
manager = CentralManager(address=(address, port), authkey=authkey)
manager.register("get_queue", callable=lambda: inputqueue)
manager.daemon = True
manager.start()
return manager
This creates an instance of the manager, adds authentication to the manager with the authkey
, registers
a function to it, get_queue
that returns the shared queue, and starts the manager.
On each of the clients, you'll then have a function like:
def make_client_manager(port, address, auth_key):
"""
get_client_manager(port, address, authkey):
Connects to a remote Sync manager to get our queues.
port - the port for the manager to listen on
address - the IP address for the manager to listen on
authkey - the key that workers must provide to connect to this manager.
"""
port = int(port)
manager = CentralManager(address=(address, port), authkey=auth_key)
manager.register("get_queue")
manager.connect()
return manager
Which will return a client instance of the manager. The clients will then call manager.get_queue()
, which will
return an instance of the queue for them to use as if it were a normal multiprocessing queue.
Lastly, as your program finishes, you will also want to call: manager.shutdown()
on the central manager.