public
Last active

Server-Side Events in flask

  • Download Gist
flask sse
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
#!/bin/python2.7
# -`*- coding: utf-8 -*-
 
"""
test for Server-Side events in flask
 
inspiration from:
http://www.html5rocks.com/en/tutorials/eventsource/basics/
https://github.com/niwibe/sse.git
https://github.com/niwibe/django-sse.git
https://github.com/jkbr/chat
"""
 
from __future__ import unicode_literals
import time
import sys
from flask import Response
from flask.views import View
 
 
class Sse(object):
def __init__(self):
self._buffer = {}
self._buffer['messages'] = {}
 
def set_retry(self, num):
self._buffer['retry'] = num
 
def set_event_id(self, event_id):
self._buffer['id'] = event_id
 
def reset_event_id(self):
self.set_event_id(None)
 
def _parse_text(self, text, encoding='utf-8'):
if isinstance(text, (list, tuple, set)):
text = ''.join(self._parse_text(i) for i in text)
 
if isinstance(text, bytes):
text = text.decode(encoding)
 
return str(text) + '\n'
 
def add_message(self, text, event='message'):
"""
Add message with eventname to the buffer.
"""
event_list = self._buffer['messages'].setdefault(event, [])
event_list.append(self._parse_text(text))
 
def __str__(self):
if sys.version_info[0] >= 3: # Python 3
return self.__unicode__()
return self.__unicode__().encode('utf8')
 
def __unicode__(self):
return ''.join(i for i in self)
 
def flush(self):
"""
Reset the internal buffer to initial state.
"""
self._buffer.clear()
self._buffer['messages'] = {}
 
def __iter__(self):
if 'retry' in self._buffer:
yield "retry: {0}\n\n".format(self._buffer['retry'])
 
if 'id' in self._buffer:
if self._buffer['id']:
yield "id: {0}\n\n".format(self._buffer['id'])
else:
yield "id\n\n" # Reset event id
 
for eventname in self._buffer['messages']:
for message in self._buffer['messages'][eventname]:
yield "event: {0}\n".format(eventname)
yield "data: {0}\n".format(message)
 
 
class SseStream(View):
def get_last_id(self):
if "HTTP_LAST_EVENT_ID" in self.request.META:
return self.request.META['HTTP_LAST_EVENT_ID']
return None
 
def _compose_message():
raise NotImplementedError
 
def _iterator(self):
while self._compose_message():
for line in self.sse:
yield line
self.sse.flush()
 
def dispatch_request(self):
self.sse = Sse()
response = Response(self._iterator(), mimetype="text/event-stream")
return response
 
 
class PeriodicStream(SseStream):
def __init__(self, functions):
self.functions = functions
self.counter = 0
 
def _compose_message(self):
time.sleep(0.1)
self.counter += 1
if self.counter > 600:
self.counter = 0
for freq, func in self.functions.values():
if self.counter % freq == 0:
func(self, freq)
return True
 
 
class RedisSseStream(SseStream):
def __init__(self, handlers, pubsub):
self.handlers = handlers
pubsub.subscribe('stream')
self._messages = pubsub.listen()
 
def _compose_message(self):
message = self._messages.next()['data']
if message in self.handlers.keys():
self.handlers[message](self)
return True

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.