Skip to content

Instantly share code, notes, and snippets.

@dwf
Last active August 29, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dwf/9a1d77a8dd6bff3e1b0d to your computer and use it in GitHub Desktop.
Save dwf/9a1d77a8dd6bff3e1b0d to your computer and use it in GitHub Desktop.
"""Very simple PUSH-PULL reusable producer-consumer with ZeroMQ."""
# By David Warde-Farley. Released under the 3-clause BSD license.
import time
from multiprocessing import Process
import zmq
def _producer_wrapper(f, port, addr='tcp://127.0.0.1'):
"""A shim that sets up a socket and starts the producer callable.
Parameters
----------
f : callable
Callable that takes a single argument, a handle
for a ZeroMQ PUSH socket. Must be picklable.
port : int
The port on which the socket should connect.
addr : str, optional
Address to which the socket should connect. Defaults
to localhost.
"""
try:
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect(':'.join([addr, str(port)]))
f(socket)
finally:
# Works around a Python 3.x bug.
context.destroy()
def spawn_producer(f, port, addr='tcp://127.0.0.1'):
"""Start a process that sends results on a PUSH socket.
Parameters
----------
f : callable
Callable that takes a single argument, a handle
for a ZeroMQ PUSH socket. Must be picklable.
Returns
-------
process : multiprocessing.Process
The process handle of the created producer process.
"""
process = Process(target=_producer_wrapper, args=(f, port, addr))
process.start()
return process
def producer_consumer(producer, consumer, addr='tcp://127.0.0.1',
port=None, context=None):
"""A producer-consumer pattern.
Parameters
----------
producer : callable
Callable that takes a single argument, a handle
for a ZeroMQ PUSH socket. Must be picklable.
consumer : callable
Callable that takes a single argument, a handle
for a ZeroMQ PULL socket.
port : int, optional
The port on which the consumer should listen.
context : zmq.Context, optional
The ZeroMQ Context to use. One will be created otherwise.
Returns
-------
result
Passes along whatever `consumer` returns.
Notes
-----
This sets up a PULL socket in the calling process and forks
a process that calls `producer` on a PUSH socket. When the
consumer returns, the producer process is terminated.
Wrap `consumer` or `producer` in a `functools.partial` object
in order to send additional arguments; the callables passed in
should expect only one required, positional argument, the socket
handle.
"""
context_created = False
if context is None:
context_created = True
context = zmq.Context()
try:
consumer_socket = context.socket(zmq.PULL)
if port is None:
port = consumer_socket.bind_to_random_port(addr)
try:
process = spawn_producer(producer, port)
result = consumer(consumer_socket)
finally:
process.terminate()
return result
finally:
# Works around a Python 3.x bug.
if context_created:
context.destroy()
def send_integers(socket):
socket.send_pyobj(50)
for i in range(50):
socket.send_pyobj(i ** 2)
time.sleep(1)
def receive_integers(socket):
num = socket.recv_pyobj()
total = 0
for i in range(num):
recv = socket.recv_pyobj()
total += recv
print(recv, total)
return total
if __name__ == "__main__":
total = producer_consumer(send_integers, receive_integers)
print('total:', total)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment