Skip to content

Instantly share code, notes, and snippets.

@dvarrazzo
Created March 31, 2012 20:48
Show Gist options
  • Save dvarrazzo/2268338 to your computer and use it in GitHub Desktop.
Save dvarrazzo/2268338 to your computer and use it in GitHub Desktop.
zmq eventlet tests
import sys
import zmq
try:
n = int(sys.argv[1])
except:
n = 10
ctx = zmq.Context()
req = ctx.socket(zmq.REQ)
req.connect("tcp://localhost:5555")
for i in range(n):
print "Sending Hello %d..." % i
req.send("Hello %s" % i)
reply = req.recv()
print "Received %s %d" % (reply, i)
import sys
def is_eventlet():
return bool(len(sys.argv) > 1 and sys.argv[1] == 'eventlet')
if is_eventlet():
print "going eventlet"
import eventlet
from eventlet.green import zmq
from eventlet.green import time
from eventlet.green import threading
else:
print "going thread"
import zmq
import time
import threading
def worker(ctx, i):
print "worker", i, "started"
sock = ctx.socket(zmq.REP)
sock.connect('tcp://localhost:5560')
while 1:
msg = sock.recv()
print "worker", i, "got", msg
time.sleep(1)
sock.send('World')
def queue(frontend, backend):
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
while 1:
socks = dict(poller.poll())
if socks.get(frontend) == zmq.POLLIN:
while 1:
s = frontend.recv()
print "got msg from frontend: [%s]" % (
",".join(map(str, map(ord, s))) if s and s[0] == '\x00' else s)
more = frontend.getsockopt(zmq.RCVMORE)
backend.send(s, more and zmq.SNDMORE or 0)
if not more:
break
if socks.get(backend) == zmq.POLLIN:
while 1:
s = backend.recv()
print "got msg from backend: [%s]" % (
",".join(map(str, map(ord, s))) if s and s[0] == '\x00' else s)
more = backend.getsockopt(zmq.RCVMORE)
frontend.send(s, more and zmq.SNDMORE or 0)
if not more:
break
def queue_eventlet(frontend, backend):
def f1():
print "queue thread 1"
while 1:
while 1:
s = frontend.recv()
print "got msg from frontend: [%s]" % (
",".join(map(str, map(ord, s))) if s and s[0] == '\x00' else s)
more = frontend.getsockopt(zmq.RCVMORE)
backend.send(s, more and zmq.SNDMORE or 0)
if not more:
break
def f2():
print "queue thread 2"
while 1:
while 1:
s = backend.recv()
print "got msg from backend: [%s]" % (
",".join(map(str, map(ord, s))) if s and s[0] == '\x00' else s)
more = backend.getsockopt(zmq.RCVMORE)
frontend.send(s, more and zmq.SNDMORE or 0)
if not more:
break
eventlet.spawn(f1)
eventlet.spawn(f2)
while 1:
print "sleep"
time.sleep(5)
def main():
ctx = zmq.Context()
frontend = ctx.socket(zmq.ROUTER)
frontend.bind("tcp://*:5555")
backend = ctx.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
for i in range(5):
t = threading.Thread(target=worker, args=(ctx, i))
t.setDaemon(True)
t.start()
if is_eventlet():
queue_eventlet(frontend, backend)
else:
queue(frontend, backend)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment