Last active
August 29, 2015 14:25
-
-
Save dwf/9a1d77a8dd6bff3e1b0d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Very simple PUSH-PULL reusable producer-consumer with ZeroMQ.""" | |
# By David Warde-Farley. Released under the 3-clause BSD license. | |
import time | |
from multiprocessing import Process | |
import zmq | |
def _producer_wrapper(f, port, addr='tcp://127.0.0.1'): | |
"""A shim that sets up a socket and starts the producer callable. | |
Parameters | |
---------- | |
f : callable | |
Callable that takes a single argument, a handle | |
for a ZeroMQ PUSH socket. Must be picklable. | |
port : int | |
The port on which the socket should connect. | |
addr : str, optional | |
Address to which the socket should connect. Defaults | |
to localhost. | |
""" | |
try: | |
context = zmq.Context() | |
socket = context.socket(zmq.PUSH) | |
socket.connect(':'.join([addr, str(port)])) | |
f(socket) | |
finally: | |
# Works around a Python 3.x bug. | |
context.destroy() | |
def spawn_producer(f, port, addr='tcp://127.0.0.1'): | |
"""Start a process that sends results on a PUSH socket. | |
Parameters | |
---------- | |
f : callable | |
Callable that takes a single argument, a handle | |
for a ZeroMQ PUSH socket. Must be picklable. | |
Returns | |
------- | |
process : multiprocessing.Process | |
The process handle of the created producer process. | |
""" | |
process = Process(target=_producer_wrapper, args=(f, port, addr)) | |
process.start() | |
return process | |
def producer_consumer(producer, consumer, addr='tcp://127.0.0.1', | |
port=None, context=None): | |
"""A producer-consumer pattern. | |
Parameters | |
---------- | |
producer : callable | |
Callable that takes a single argument, a handle | |
for a ZeroMQ PUSH socket. Must be picklable. | |
consumer : callable | |
Callable that takes a single argument, a handle | |
for a ZeroMQ PULL socket. | |
port : int, optional | |
The port on which the consumer should listen. | |
context : zmq.Context, optional | |
The ZeroMQ Context to use. One will be created otherwise. | |
Returns | |
------- | |
result | |
Passes along whatever `consumer` returns. | |
Notes | |
----- | |
This sets up a PULL socket in the calling process and forks | |
a process that calls `producer` on a PUSH socket. When the | |
consumer returns, the producer process is terminated. | |
Wrap `consumer` or `producer` in a `functools.partial` object | |
in order to send additional arguments; the callables passed in | |
should expect only one required, positional argument, the socket | |
handle. | |
""" | |
context_created = False | |
if context is None: | |
context_created = True | |
context = zmq.Context() | |
try: | |
consumer_socket = context.socket(zmq.PULL) | |
if port is None: | |
port = consumer_socket.bind_to_random_port(addr) | |
try: | |
process = spawn_producer(producer, port) | |
result = consumer(consumer_socket) | |
finally: | |
process.terminate() | |
return result | |
finally: | |
# Works around a Python 3.x bug. | |
if context_created: | |
context.destroy() | |
def send_integers(socket): | |
socket.send_pyobj(50) | |
for i in range(50): | |
socket.send_pyobj(i ** 2) | |
time.sleep(1) | |
def receive_integers(socket): | |
num = socket.recv_pyobj() | |
total = 0 | |
for i in range(num): | |
recv = socket.recv_pyobj() | |
total += recv | |
print(recv, total) | |
return total | |
if __name__ == "__main__": | |
total = producer_consumer(send_integers, receive_integers) | |
print('total:', total) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment