Skip to content

Instantly share code, notes, and snippets.

@minrk
Created September 27, 2012 21:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/3796673 to your computer and use it in GitHub Desktop.
Save minrk/3796673 to your computer and use it in GitHub Desktop.
import json, time, threading
import zmq
from zmq.eventloop import ioloop, zmqstream
class Logger( threading.Thread ):
def __init__( self, *args, **kwargs ):
threading.Thread.__init__( self )
self.rtrSocket = None
self.stream = None
return
def msgReceived( self, recvStream, msg ):
statusDict = { "success": True, "msg": "" }
print( "{0} Message received: {1}".format( time.time(), msg ) )
# send identity prefix for routing:
recvStream.send(msg[0], zmq.SNDMORE)
try:
print( "{0} Sending status: {1}".format( time.time(), json.dumps( statusDict ) ) )
recvStream.send_json( statusDict )
except:
print( "{0} Encountered an error sending item".format( time.time() ) )
return
def stop( self ):
print( "{0} Stopping logger listener".format( time.time() ) )
ioloop.IOLoop.instance().stop()
return
def run( self ):
context = zmq.Context.instance()
self.rtrSocket = context.socket( zmq.ROUTER )
self.rtrSocket.setsockopt( zmq.IDENTITY, "LOGGER" )
self.rtrSocket.setsockopt( zmq.LINGER, 1000 )
self.rtrSocket.bind( "tcp://127.0.0.1:48172" )
print( "{0} Running...".format( time.time() ) )
self.stream = zmqstream.ZMQStream( self.rtrSocket )
self.stream.on_recv_stream( self.msgReceived )
ioloop.IOLoop.instance().start()
print( "{0} Cleaning up...".format( time.time() ) )
self.stream.stop_on_recv()
self.stream.close()
self.rtrSocket.close()
return
if __name__ == '__main__':
ctx = zmq.Context.instance()
sender = ctx.socket( zmq.DEALER )
sender.connect( "tcp://127.0.0.1:48172" )
logger = Logger()
logger.start()
for i in range(5):
sender.send( b'hi {0}'.format( i ) )
print( "{0} reply received {1}".format( time.time(), sender.recv_json() ) )
time.sleep(1)
logger.stop()
logger.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment