Skip to content

Instantly share code, notes, and snippets.

@jramseygreen
Last active March 26, 2024 15:21
Show Gist options
  • Save jramseygreen/a80c9788cfeee0df130195fee948339b to your computer and use it in GitHub Desktop.
Save jramseygreen/a80c9788cfeee0df130195fee948339b to your computer and use it in GitHub Desktop.
Implement SSE for flask
from flask import Blueprint, jsonify
from sse import sse
# required line at the top of every blueprint file
api = Blueprint("api", __name__) # match variable name and first arg to file name
# register more blueprints here to further split up the api
# e.g.
# api.register_blueprint(blueprint, url_prefix='/users')
# would cascade through /api/users
# api routes when hitting /api
@api.route("/")
def heartbeat():
return jsonify({"status": "healthy"})
# if you wanted you could supply a unique id to the listen method to send messages to specific clients
@api.route("/listen")
def listen():
client, stream = sse.listen()
return stream
# this will ping all sse clients
@api.route('/ping')
def pingall():
sse.announce('command', {'data': 'data'})
return {}, 200
# this will ping all sse clients
@api.route('/ping/<int:client>')
def ping(client: int):
sse.send(client, 'command', {'data': 'data'})
return {}, 200
from queue import Queue
from flask import Response
from json import dumps
class __Sse:
def __init__(self):
self.__listeners = {}
def listen(self, client: int=None) -> Response:
q = Queue(maxsize=5)
if not client:
client = id(q)
self.__listeners[client] = q
# generator for flask response
def stream():
while True:
msg = q.get()
if msg == 'stop':
break
yield 'event: message\ndata: ' + dumps(msg) + '\n\n'
response = Response(stream(), content_type='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
return client, response
def announce(self, event: str, data):
for id in self.__listeners:
self.send(id, event, data)
def send(self, client: int, event: str, data):
msg = 'event: ' + event + '\ndata: ' + dumps(data) + '\n\n'
try:
self.__listeners[client].put_nowait(msg)
except queue.Full:
del self.__listeners[client]
def stop(self):
for id in list(self.__listeners):
self.__listeners[id].put_nowait('stop')
sse = __Sse()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment