Skip to content

Instantly share code, notes, and snippets.

@dvarrazzo
Created April 2, 2012 20:12
Show Gist options
  • Save dvarrazzo/2286899 to your computer and use it in GitHub Desktop.
Save dvarrazzo/2286899 to your computer and use it in GitHub Desktop.
syncing over tcp appears working
#!/usr/bin/env python
"""
This script should be a test in synchronizing zmq sockets.
Synchronization fails even if the threads prepare the subscriber before
signaling they are ready.
"""
import sys
import random
if 0:
from eventlet.green import threading
from eventlet.green import zmq
from eventlet.green import time
else:
import threading
import zmq
import time
def main():
ctx = zmq.Context()
s = ctx.socket(zmq.PUB)
s.bind('tcp://*:9999')
# have a socket to receive a 'ready' message from the worker thread
ctrl = ctx.socket(zmq.REP)
ctrl.bind('tcp://*:9998')
# fire a couple of threads to interact with
threads = []
for id in 'AB':
# fire the working thread
t = threading.Thread(target=worker, args=(ctx, id))
threads.append(t)
t.start()
# wait for a signal that the thread's subscriber is ready listening
print "waiting for %s to be ready" % id
assert id == ctrl.recv()
ctrl.send('ok')
print "%s is ready" % id
ctrl.close()
# enabling this pause it works as expected
# time.sleep(1)
print "sending"
for i in range(10):
# s.send(random.choice('AAB'), zmq.SNDMORE)
s.send('message %d' % i)
for id in 'AB':
# s.send(id, zmq.SNDMORE)
s.send('END')
for i, t in enumerate(threads):
print "waiting for thread", i, "to finish"
t.join()
print "closing"
s.close()
ctx.term()
def worker(ctx, id):
"""
A worker thread subscribed on a channel
"""
# start the subscriber
s = ctx.socket(zmq.SUB)
s.connect('tcp://localhost:9999')
s.setsockopt(zmq.SUBSCRIBE, '')
# s.setsockopt(zmq.IDENTITY, id)
# time.sleep(1) # another way to make it work
print id, "connected"
# signal the main thread we are listening ok
ctrl = ctx.socket(zmq.REQ)
ctrl.connect('tcp://localhost:9998')
ctrl.send(id)
assert 'ok' == ctrl.recv()
ctrl.close()
i = 0
while 1:
i += 1
print "%s receiving" % id
req = s.recv()
print "%s received: %s %s" % (id, i, req)
if req == 'END': break
time.sleep(0)
print "%s closing" % id
s.close()
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment