public
Created

Tornado websockets and ZMQ pubsub

  • Download Gist
forwarder.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
import zmq
 
def main():
try:
context = zmq.Context(1)
 
frontend = context.socket(zmq.SUB)
frontend.bind('tcp://*:5559')
frontend.setsockopt(zmq.SUBSCRIBE, '')
 
backend = context.socket(zmq.PUB)
backend.bind('tcp://*:5560')
 
print 'starting zmq forwarder'
zmq.device(zmq.FORWARDER, frontend, backend)
except KeyboardInterrupt:
pass
except Exception as e:
logger.exception(e)
finally:
frontend.close()
backend.close()
context.term()
 
if __name__ == '__main__':
main()
publish.py
Python
1 2 3 4 5 6 7 8
import zmq
 
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect('tcp://127.0.0.1:5559')
socket.send('session_id helloworld')
print 'sent data for channel session_id'
ws.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
 
from tornado.websocket import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
ioloop = IOLoop.instance()
 
class ZMQPubSub(object):
 
def __init__(self, callback):
self.callback = callback
 
def connect(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect('tcp://127.0.0.1:5560')
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.callback)
 
def subscribe(self, channel_id):
self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)
 
class MyWebSocket(WebSocketHandler):
 
def open(self):
self.pubsub = ZMQPubSub(self.on_data)
self.pubsub.connect()
self.pubsub.subscribe("session_id")
print 'ws opened'
 
def on_message(self, message):
print message
def on_close(self):
print 'ws closed'
 
def on_data(self, data):
print data
 
def main():
application = Application([(r'/channel', MyWebSocket)])
application.listen(10001)
print 'starting ws on port 10001'
ioloop.start()
 
if __name__ == '__main__':
main()

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.