Skip to content

Instantly share code, notes, and snippets.

@szastupov
Created July 6, 2016 13:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save szastupov/de3e3f884415892b3bf22c906b490a11 to your computer and use it in GitHub Desktop.
Save szastupov/de3e3f884415892b3bf22c906b490a11 to your computer and use it in GitHub Desktop.
websocket/long polling event bus
import logging
from tornado.websocket import WebSocketHandler
from tornado.web import asynchronous
from tornado.ioloop import PeriodicCallback
from helpers.event_bus import bus
from helpers.api import CommonHandlerMixin
from helpers.auth import http_token
from functools import partial
from . import routes
logger = logging.getLogger(__name__)
@routes.add("/api/v1/events")
class EventsHandler(WebSocketHandler, CommonHandlerMixin):
@asynchronous
def get(self):
"""Receive event stream for authenticated user
Description: |
This endpoint can be used in Web Socket and Long Polling modes.
To use it as web socket, connect with ws:// protocol and send
"Token: ..." message to authorize. Then you'll start receiving
messages with each message. To use it in long polling mode,
just GET this endpoint with typical Authorization header and
you will receive events in HTTP chunks.
"""
if self.request.headers.get("Upgrade"):
# This is a Web Socket connection
return super().get()
else:
# Fallback to long polling
self.current_user = self.authorize()
self.channel = None
self.periodic = None
if not self.current_user:
self.set_status(401)
self.write_json({"code": 401, "message": "Not authorized"})
self.finish()
else:
self.subscribe(self.write_event)
self.flush() # flush headers
def subscribe(self, callback):
self.channel = "user_%d" % self.current_user
self.callback = bus.subscribe(self.channel, callback)
def write_event(self, payload):
self.write(payload)
self.flush()
def open(self):
self.channel = None
self.periodic = PeriodicCallback(partial(self.ping, b""), 20000)
def on_close(self):
if self.channel:
bus.unsubscribe(self.channel, self.callback)
if self.periodic:
self.periodic.stop()
def on_message(self, msg):
if not msg.startswith("Token"):
self.write_error(400, "Expected Token")
else:
if self.channel:
self.write_error(400, "Socket is already authorized")
return
self.current_user = http_token(msg)
if not self.current_user:
self.write_error(403, "Invalid Token")
else:
self.subscribe(self.write_message)
self.write_message({"authorized": True})
self.periodic.start()
def write_error(self, code, message):
self.write_message({
"error": code,
"message": message
})
def check_origin(self, origin):
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment