Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save chansdad/45b77d22257fb5e1c0006a5b4045374b to your computer and use it in GitHub Desktop.
Save chansdad/45b77d22257fb5e1c0006a5b4045374b to your computer and use it in GitHub Desktop.
Playing with threads, greenlets and ZMQ
import gevent
from gevent import monkey; monkey.patch_all()
from gevent_zeromq import zmq
import simplejson as json
import threading
import time
def server(context):
poll = zmq.Poller()
# Socket to talk to dispatcher
socket = context.socket(zmq.REP)
poll.register(socket, zmq.POLLIN)
runid = 1234
print "Socket bound on %s ..." % (runid)
rc = socket.bind("inproc://%s" % (runid))
print "rc = ", rc
#socket.bind("tcp://*:8040")
while True:
string = socket.recv()
print("Received request for : [%s]" % (string))
# do some 'work'
# See the difference in handling more clients with different sleep
# values
time.sleep(1)
#send reply back to client
socket.send("World")
#break
# We never get here but clean up the context
socket.setsockopt(zmq.LINGER, 0)
socket.close()
# WARNING - if context is terminated, any active clients who share the same
# context will get screwed.
# context.term()
def client(context, i):
# Create Lazy Pirate ZMQ REQ endpoint
# Inspired from http://zguide2.zeromq.org/py:lpclient
poll = zmq.Poller()
# Socket to talk to server
client = context.socket(zmq.REQ)
poll.register(client, zmq.POLLIN)
runid = 1234
print "[Client %s] Connecting to Outbound Server %s ..." % (i, runid)
client.connect("inproc://%s" % (runid))
args = {'client' : i, 'params' : 'Hello'}
print "Sending request with args = %s" %(args)
client.send(json.dumps(args))
REQUEST_TIMEOUT = 5000
while True:
socks = dict(poll.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
# Get the reply.
result = client.recv()
else:
print "No response from Outbound Server"
result = None
break
client.setsockopt(zmq.LINGER, 0)
client.close()
poll.unregister(client)
return result
def server_thread(context):
print "Server thread started ..."
js = [gevent.spawn(server, context), ]
print "Server Greenlet started - "
def client_thread(context):
print "Client thread started ..."
CLIENTS = 10
jobs = [gevent.spawn(client, context, i) for i in range(CLIENTS)]
print "Client Greenlets started - "
gevent.joinall(jobs)
print [job.value for job in jobs]
if __name__ == '__main__':
# Prepare our context
# You should create and use exactly one context in your process that will
# be used to communicate between threads. What this means is that for
# "inproc://" communications there has to be one single context.
# Multiple contexts may coexist within a single application which do not
# talk to each other over zmq "inproc://".
context = zmq.Context(1)
# First start the server thread. This is necessary as "bind" should happen
# before "connect". Pass the zmq context to the thread.
t1 = threading.Thread(target=server_thread, args=(context, ))
t1.start()
# Now, start the client thread
t2 = threading.Thread(target=client_thread, args=(context, ))
t2.start()
# Note - Thread Safety
# 1. A ZMQ context is thread safe and may be shared among as many application
# threads as necessary, without any additional locking required on the part
# of the caller.
# 2. Individual ZMQ sockets are not thread safe except in the case where full
# memory barriers are issued when migrating a socket from one thread to
# another. In practice this means applications can create a socket in one
# thread with zmq_socket() and then pass it to a newly created thread as part
# of thread initialization, for example via a structure passed as an argument
# to pthread_create().
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment