Skip to content

Instantly share code, notes, and snippets.

@jelmervdl
Created December 1, 2016 11:03
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jelmervdl/5a9861f7298907179c20a54c0e154560 to your computer and use it in GitHub Desktop.
Save jelmervdl/5a9861f7298907179c20a54c0e154560 to your computer and use it in GitHub Desktop.
Pure Python & Flask server-side event source
<script>
var stream = new EventSource('/api/stream');
stream.onmessage = function(e) {
console.info(e.data);
};
</script>
from queue import Queue
from flask import Flask, render_template, request
app = Flask(__name__)
app.debug = True
queue = Queue()
def event_stream():
while True:
# Question: how to break out of this loop when the app is killed?
message = queue.get(True)
print("Sending {}".format(message))
yield "data: {}\n\n".format(message)
@app.route('/')
def hello():
return render_template('index.html')
@app.route('/api/stream')
def stream():
return flask.Response(event_stream(), mimetype="text/event-stream")
@app.route('/api/post', methods=['GET'])
def api_parse_sentence():
queue.put(request.args.get('sentence'))
return "OK"
if __name__ == '__main__':
app.run(threaded=True)
@jelmervdl
Copy link
Author

Werkzeug (which does the heavy lifting inside Flask) calls sys.exit(3) when it wants to reload. However, this won't force the thread in which the while-loop is waiting on the queue to quit, breaking the reloading effectively. (Which is very unfortunate, because I wanted to use the event stream specifically to notice reloads of the back-end.)

@jelmervdl
Copy link
Author

queue.get accepts a second parameter as timeout. Maybe a 500ms timeout would be enough to notice the restart efficiently enough?

@eliwjones
Copy link

Was amusing myself today looking around for Python server side event examples and came across this.

Currently, only one of the /api/stream will get the message since you are sharing the same queue in event_stream().

The below makes a queue for each subscription and broadcasts to all of them (granted, only safe for low request rates):

from queue import Queue
from flask import Flask, render_template, request, Response

app = Flask(__name__)
app.debug = True

QUEUES = []


def event_stream(q):
    while True:
        # Question: how to break out of this loop when the app is killed?
        message = q.get(True)
        print("Sending {}".format(message))
        yield "data: {}\n\n".format(message)


@app.route('/')
def hello():
    return render_template('index.html')


@app.route('/api/stream')
def stream():
    q = Queue()
    QUEUES.append(q)

    return Response(event_stream(q), mimetype="text/event-stream")


@app.route('/api/post', methods=['GET'])
def api_parse_sentence():
    for q in QUEUES:
        q.put(request.args.get('sentence'))

    return "OK"


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=3000, debug=True, threaded=True)

@jelmervdl
Copy link
Author

@eliwjones That's a very fair point. Hadn't encountered that yet, but makes total sense. Thanks for the revision.

You're also right that this won't work for more than a very simple test setup (in case of your example, QUEUES would just keep on growing and all queues that won't be listened to anymore will keep getting saturated). So let that be a warning to anyone wanting to use this snippet :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment