Skip to content

Instantly share code, notes, and snippets.

@memogarcia

memogarcia/collector.py

Last active Sep 8, 2016
Embed
What would you like to do?
Python ZMQ Push Pull Pattern
import time
import zmq
import pprint
def result_collector():
context = zmq.Context()
results_receiver = context.socket(zmq.PULL)
results_receiver.bind("tcp://127.0.0.1:5558")
collecter_data = {}
for x in xrange(1000):
result = results_receiver.recv_json()
if collecter_data.has_key(result['consumer']):
collecter_data[result['consumer']] = collecter_data[result['consumer']] + 1
else:
collecter_data[result['consumer']] = 1
if x == 999:
pprint.pprint(collecter_data)
result_collector()
import time
import zmq
import random
def consumer():
consumer_id = random.randrange(1,10005)
print "I am consumer #%s" % (consumer_id)
context = zmq.Context()
# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
# send work
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.connect("tcp://127.0.0.1:5558")
while True:
work = consumer_receiver.recv_json()
data = work['num']
result = { 'consumer' : consumer_id, 'num' : data}
if data%2 == 0:
consumer_sender.send_json(result)
consumer()
import time
import zmq
def producer():
context = zmq.Context()
zmq_socket = context.socket(zmq.PUSH)
zmq_socket.bind("tcp://127.0.0.1:5557")
# Start your result manager and workers before you start your producers
for num in xrange(20000):
work_message = { 'num' : num }
zmq_socket.send_json(work_message)
producer()
@memogarcia

This comment has been minimized.

Copy link
Owner Author

@memogarcia memogarcia commented Sep 8, 2016

Push/Pull pattern

Push and Pull sockets let you distribute messages to multiple workers, arranged in a pipeline. A Push socket will distribute sent messages to its Pull clients evenly. This is equivalent to producer/consumer model but the results computed by consumer are not sent upstream but downstream to another pull/consumer socket.

Data always flows down the pipeline, and each stage of the pipeline is connected to at least one node. When a pipeline stage is connected to multiple nodes data is load-balanced among all connected nodes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.