Skip to content

Instantly share code, notes, and snippets.

@misuzu
Forked from mivade/tornadosse.py
Last active November 8, 2017 12:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save misuzu/98d006fb789cf3ce180428c353717fd0 to your computer and use it in GitHub Desktop.
Save misuzu/98d006fb789cf3ce180428c353717fd0 to your computer and use it in GitHub Desktop.
Tornado server-sent events
"""Demonstration of server-sent events with Tornado. To see the
stream, you can either point your browser to ``http://localhost:8080``
or use ``curl`` like so::
$ curl http://localhost:8080/events
"""
import json
import signal
import tornado.concurrent
import tornado.gen
import tornado.ioloop
import tornado.web
import tornado.httpserver
import tornado.options
import tornado.iostream
class DataSource(object):
def __init__(self, ioloop=None):
self._ioloop = ioloop or tornado.ioloop.IOLoop.current()
self._futures = set()
self._items = []
self._active = True
@property
def active(self):
return self._active
def disable(self):
self._active = False
def update(self, items):
futures, self._futures = self._futures, set()
self._items.extend(items)
for future in futures:
future.set_result((len(self._items), items))
def get_items(self, index):
return self._items[index:]
def wait_updates(self, index):
future = tornado.concurrent.Future()
items = self.get_items(index)
if items:
future.set_result((len(self._items), items))
else:
self._futures.add(future)
timeout = self._ioloop.call_later(
5, lambda: future.set_result((len(self._items), [])))
future.add_done_callback(
lambda f: (self._ioloop.remove_timeout(timeout) or
self._futures.discard(future)))
return future
class EventSource(tornado.web.RequestHandler):
def initialize(self, source):
self.source = source
@tornado.gen.coroutine
def get(self):
index = int(self.request.headers.get('Last-Event-ID', '0'))
if not (self.source.active or self.source.get_items(index)):
return self.send_error(404)
self.set_header('Content-Type', 'text/event-stream; charset=utf-8')
self.set_header('Cache-Control', 'no-cache')
self.set_header('Connection', 'keep-alive')
while True:
index, items = yield self.source.wait_updates(index)
if items:
msg = 'data: %s\nid: %s\n\n' % (json.dumps(items), index)
else:
msg = 'event: ping\n\n'
try:
self.write(msg)
yield self.flush()
except tornado.iostream.StreamClosedError:
return
if not self.source.active:
break
self.finish()
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.finish("""
<div id="messages"></div>
<script type="text/javascript">
var source = new EventSource('/events');
source.onmessage = function(message) {
var div = document.getElementById("messages");
div.innerHTML = div.innerHTML + "<br>" + message.data;
};
</script>""")
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
if __name__ == "__main__":
tornado.options.options.parse_command_line()
ioloop = tornado.ioloop.IOLoop.current()
generator = fibonacci()
publisher = DataSource(ioloop)
checker = tornado.ioloop.PeriodicCallback(
lambda: publisher.update([next(generator)]), 1000)
checker.start()
app = tornado.web.Application(
[
(r'/', MainHandler),
(r'/events', EventSource, dict(source=publisher))
],
debug=True
)
server = tornado.httpserver.HTTPServer(app)
server.listen(8080)
signal.signal(signal.SIGINT, lambda x, y: ioloop.stop())
ioloop.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment