Created
October 20, 2011 20:00
-
-
Save espeed/1302163 to your computer and use it in GitHub Desktop.
PyZMQ Streamer Tester
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Note, added LINGER -1 to all sockets. | |
# See UPDATE comment below... | |
import zmq | |
from zmq.devices.basedevice import ProcessDevice | |
from multiprocessing import Process | |
import time | |
in_port = 6000 | |
out_port = 6001 | |
number_of_workers = 1 | |
number_of_messages = 1000 | |
pd = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH) | |
pd.bind_in("tcp://127.0.0.1:%d" % in_port ) | |
pd.bind_out("tcp://127.0.0.1:%d" % out_port) | |
pd.setsockopt_in(zmq.LINGER,-1) | |
pd.setsockopt_out(zmq.LINGER,-1) | |
pd.start() | |
def client(): | |
context = zmq.Context() | |
socket = context.socket(zmq.PUSH) | |
socket.setsockopt(zmq.LINGER,-1) | |
uri = "tcp://127.0.0.1:%d" % in_port | |
socket.connect(uri) | |
for i in range(number_of_messages): | |
socket.send('#%s' % i) | |
# UPDATE: This sleep() keeps the socket open long enough | |
# for the messages to send, but there must be a better way! | |
# What's the right way to handle this??? | |
#time.sleep(5) | |
socket.close() | |
context.term() | |
def worker(work_num): | |
context = zmq.Context() | |
socket = context.socket(zmq.PULL) | |
socket.setsockopt(zmq.LINGER,-1) | |
socket.connect("tcp://127.0.0.1:%d" % out_port) | |
try: | |
while True: | |
message = socket.recv() | |
print "%s got message! %s" % (work_num, message) | |
finally: | |
print "Closing down..." | |
socket.close() | |
context.term() | |
for work_num in range(number_of_workers): | |
Process(target=worker, args=(work_num,)).start() | |
time.sleep(1) | |
client() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment