Skip to content

Instantly share code, notes, and snippets.

@espeed
Created October 20, 2011 20:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save espeed/1302163 to your computer and use it in GitHub Desktop.
Save espeed/1302163 to your computer and use it in GitHub Desktop.
PyZMQ Streamer Tester
# Note, added LINGER -1 to all sockets.
# See UPDATE comment below...
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
import time
in_port = 6000
out_port = 6001
number_of_workers = 1
number_of_messages = 1000
pd = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
pd.bind_in("tcp://127.0.0.1:%d" % in_port )
pd.bind_out("tcp://127.0.0.1:%d" % out_port)
pd.setsockopt_in(zmq.LINGER,-1)
pd.setsockopt_out(zmq.LINGER,-1)
pd.start()
def client():
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.LINGER,-1)
uri = "tcp://127.0.0.1:%d" % in_port
socket.connect(uri)
for i in range(number_of_messages):
socket.send('#%s' % i)
# UPDATE: This sleep() keeps the socket open long enough
# for the messages to send, but there must be a better way!
# What's the right way to handle this???
#time.sleep(5)
socket.close()
context.term()
def worker(work_num):
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.setsockopt(zmq.LINGER,-1)
socket.connect("tcp://127.0.0.1:%d" % out_port)
try:
while True:
message = socket.recv()
print "%s got message! %s" % (work_num, message)
finally:
print "Closing down..."
socket.close()
context.term()
for work_num in range(number_of_workers):
Process(target=worker, args=(work_num,)).start()
time.sleep(1)
client()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment