Created
April 2, 2012 20:12
-
-
Save dvarrazzo/2286899 to your computer and use it in GitHub Desktop.
syncing over tcp appears working
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This script should be a test in synchronizing zmq sockets. | |
Synchronization fails even if the threads prepare the subscriber before | |
signaling they are ready. | |
""" | |
import sys | |
import random | |
if 0: | |
from eventlet.green import threading | |
from eventlet.green import zmq | |
from eventlet.green import time | |
else: | |
import threading | |
import zmq | |
import time | |
def main(): | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.PUB) | |
s.bind('tcp://*:9999') | |
# have a socket to receive a 'ready' message from the worker thread | |
ctrl = ctx.socket(zmq.REP) | |
ctrl.bind('tcp://*:9998') | |
# fire a couple of threads to interact with | |
threads = [] | |
for id in 'AB': | |
# fire the working thread | |
t = threading.Thread(target=worker, args=(ctx, id)) | |
threads.append(t) | |
t.start() | |
# wait for a signal that the thread's subscriber is ready listening | |
print "waiting for %s to be ready" % id | |
assert id == ctrl.recv() | |
ctrl.send('ok') | |
print "%s is ready" % id | |
ctrl.close() | |
# enabling this pause it works as expected | |
# time.sleep(1) | |
print "sending" | |
for i in range(10): | |
# s.send(random.choice('AAB'), zmq.SNDMORE) | |
s.send('message %d' % i) | |
for id in 'AB': | |
# s.send(id, zmq.SNDMORE) | |
s.send('END') | |
for i, t in enumerate(threads): | |
print "waiting for thread", i, "to finish" | |
t.join() | |
print "closing" | |
s.close() | |
ctx.term() | |
def worker(ctx, id): | |
""" | |
A worker thread subscribed on a channel | |
""" | |
# start the subscriber | |
s = ctx.socket(zmq.SUB) | |
s.connect('tcp://localhost:9999') | |
s.setsockopt(zmq.SUBSCRIBE, '') | |
# s.setsockopt(zmq.IDENTITY, id) | |
# time.sleep(1) # another way to make it work | |
print id, "connected" | |
# signal the main thread we are listening ok | |
ctrl = ctx.socket(zmq.REQ) | |
ctrl.connect('tcp://localhost:9998') | |
ctrl.send(id) | |
assert 'ok' == ctrl.recv() | |
ctrl.close() | |
i = 0 | |
while 1: | |
i += 1 | |
print "%s receiving" % id | |
req = s.recv() | |
print "%s received: %s %s" % (id, i, req) | |
if req == 'END': break | |
time.sleep(0) | |
print "%s closing" % id | |
s.close() | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment