Skip to content

@Chris2048 /flask sse
Last active

Embed URL


Subversion checkout URL

You can clone with
Download ZIP
Server-Side Events in flask
# -`*- coding: utf-8 -*-
test for Server-Side events in flask
inspiration from:
from __future__ import unicode_literals
import time
import sys
from flask import Response
from flask.views import View
class Sse(object):
def __init__(self):
self._buffer = {}
self._buffer['messages'] = {}
def set_retry(self, num):
self._buffer['retry'] = num
def set_event_id(self, event_id):
self._buffer['id'] = event_id
def reset_event_id(self):
def _parse_text(self, text, encoding='utf-8'):
if isinstance(text, (list, tuple, set)):
text = ''.join(self._parse_text(i) for i in text)
if isinstance(text, bytes):
text = text.decode(encoding)
return str(text) + '\n'
def add_message(self, text, event='message'):
Add message with eventname to the buffer.
event_list = self._buffer['messages'].setdefault(event, [])
def __str__(self):
if sys.version_info[0] >= 3: # Python 3
return self.__unicode__()
return self.__unicode__().encode('utf8')
def __unicode__(self):
return ''.join(i for i in self)
def flush(self):
Reset the internal buffer to initial state.
self._buffer['messages'] = {}
def __iter__(self):
if 'retry' in self._buffer:
yield "retry: {0}\n\n".format(self._buffer['retry'])
if 'id' in self._buffer:
if self._buffer['id']:
yield "id: {0}\n\n".format(self._buffer['id'])
yield "id\n\n" # Reset event id
for eventname in self._buffer['messages']:
for message in self._buffer['messages'][eventname]:
yield "event: {0}\n".format(eventname)
yield "data: {0}\n".format(message)
class SseStream(View):
def get_last_id(self):
if "HTTP_LAST_EVENT_ID" in self.request.META:
return self.request.META['HTTP_LAST_EVENT_ID']
return None
def _compose_message():
raise NotImplementedError
def _iterator(self):
while self._compose_message():
for line in self.sse:
yield line
def dispatch_request(self):
self.sse = Sse()
response = Response(self._iterator(), mimetype="text/event-stream")
return response
class PeriodicStream(SseStream):
def __init__(self, functions):
self.functions = functions
self.counter = 0
def _compose_message(self):
self.counter += 1
if self.counter > 600:
self.counter = 0
for freq, func in self.functions.values():
if self.counter % freq == 0:
func(self, freq)
return True
class RedisSseStream(SseStream):
def __init__(self, handlers, pubsub):
self.handlers = handlers
self._messages = pubsub.listen()
def _compose_message(self):
message =['data']
if message in self.handlers.keys():
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.