Skip to content

Instantly share code, notes, and snippets.

@memogarcia
Last active July 10, 2023 02:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save memogarcia/06be90e61a7a39686d54bfa26b8d261a to your computer and use it in GitHub Desktop.
Save memogarcia/06be90e61a7a39686d54bfa26b8d261a to your computer and use it in GitHub Desktop.
Python ZMQ Push Pull Pattern
import time
import zmq
import pprint
def result_collector():
context = zmq.Context()
results_receiver = context.socket(zmq.PULL)
results_receiver.bind("tcp://127.0.0.1:5558")
collecter_data = {}
for x in xrange(1000):
result = results_receiver.recv_json()
if collecter_data.has_key(result['consumer']):
collecter_data[result['consumer']] = collecter_data[result['consumer']] + 1
else:
collecter_data[result['consumer']] = 1
if x == 999:
pprint.pprint(collecter_data)
result_collector()
import time
import zmq
import random
def consumer():
consumer_id = random.randrange(1,10005)
print "I am consumer #%s" % (consumer_id)
context = zmq.Context()
# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
# send work
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.connect("tcp://127.0.0.1:5558")
while True:
work = consumer_receiver.recv_json()
data = work['num']
result = { 'consumer' : consumer_id, 'num' : data}
if data%2 == 0:
consumer_sender.send_json(result)
consumer()
import time
import zmq
def producer():
context = zmq.Context()
zmq_socket = context.socket(zmq.PUSH)
zmq_socket.bind("tcp://127.0.0.1:5557")
# Start your result manager and workers before you start your producers
for num in xrange(20000):
work_message = { 'num' : num }
zmq_socket.send_json(work_message)
producer()
@memogarcia
Copy link
Author

memogarcia commented Sep 8, 2016

Push/Pull pattern

Push and Pull sockets let you distribute messages to multiple workers, arranged in a pipeline. A Push socket will distribute sent messages to its Pull clients evenly. This is equivalent to producer/consumer model but the results computed by consumer are not sent upstream but downstream to another pull/consumer socket.

Data always flows down the pipeline, and each stage of the pipeline is connected to at least one node. When a pipeline stage is connected to multiple nodes data is load-balanced among all connected nodes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment