Skip to content

Instantly share code, notes, and snippets.

@mengzhuo
Created April 10, 2014 10:08
Show Gist options
  • Save mengzhuo/10364391 to your computer and use it in GitHub Desktop.
Save mengzhuo/10364391 to your computer and use it in GitHub Desktop.
Streamer
import zmq
def main():
try:
context = zmq.Context(1)
# Socket facing clients
frontend = context.socket(zmq.PULL)
frontend.bind("tcp://*:5559")
# Socket facing services
backend = context.socket(zmq.PUSH)
backend.bind("tcp://*:5560")
zmq.device(zmq.STREAMER, frontend, backend)
except Exception, e:
print e
print "bringing down zmq device"
finally:
pass
frontend.close()
backend.close()
context.term()
if __name__ == "__main__":
main()
import time
import zmq
def producer():
context = zmq.Context()
zmq_socket = context.socket(zmq.PUSH)
zmq_socket.connect("tcp://127.0.0.1:5559")
# Start your result manager and workers before you start your producers
for num in xrange(20000):
work_message = { 'num' : num }
zmq_socket.send_json(work_message)
time.sleep(1)
producer()
import sys
import time
import zmq
import random
def consumer():
consumer_id = random.randrange(1,10005)
print "I am consumer #%s" % (consumer_id)
context = zmq.Context()
# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5560")
while True:
work = consumer_receiver.recv_json()
data = work['num']
result = { 'consumer' : consumer_id, 'num' : data}
print result
consumer()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment