Skip to content

Instantly share code, notes, and snippets.

@limboinf
Created February 25, 2016 08:22
Show Gist options
  • Save limboinf/514d8189a6ce98d12a7b to your computer and use it in GitHub Desktop.
Save limboinf/514d8189a6ce98d12a7b to your computer and use it in GitHub Desktop.
flask SSE.
# coding=utf-8
# Make sure your gevent version is >= 1.0
import gevent
from gevent.wsgi import WSGIServer
from gevent.queue import Queue
from flask import Flask, Response
import time
# SSE "protocol" is described here: http://mzl.la/UPFyxY
class ServerSentEvent(object):
def __init__(self, data):
self.data = data
self.event = None
self.id = None
self.desc_map = {
self.data : "data", # 类型为 data,表示该事件的数据。
self.event : "event", # 表示声明事件的类型。浏览器在收到数据时,会产生对应类型的事件。
self.id : "id" # 表示声明事件的标识符。
}
def encode(self):
if not self.data:
return ""
lines = ["%s: %s" % (v, k)
for k, v in self.desc_map.iteritems() if k]
return "%s\n\n" % "\n".join(lines)
app = Flask(__name__)
subscriptions = []
# Client code consumes like this.
@app.route("/")
def index():
return """
<html>
<head>
</head>
<body>
<h1>Server sent events</h1>
<div id="event"></div>
<script type="text/javascript">
var eventOutputContainer = document.getElementById("event");
var evtSrc = new EventSource("/subscribe");
// 当成功与服务器建立连接时产生
evtSrc.onopen = function(e) {
alert("服务器连接成功");
}
// 当收到服务器发送的事件时产生
evtSrc.onmessage = function(e) {
console.log(e.data);
eventOutputContainer.innerHTML = e.data;
};
//当出现错误时产生
evtSrc.onerror = function(e) {
alert("连接异常");
}
</script>
</body>
</html>
"""
@app.route("/debug")
def debug():
return "Currently %d subscriptions" % len(subscriptions)
@app.route("/publish")
def publish():
# Dummy data - pick up from request for real data
def notify():
msg = str(time.time())
for sub in subscriptions[:]:
sub.put(msg)
gevent.spawn(notify)
return "OK"
@app.route("/subscribe")
def subscribe():
def gen():
q = Queue()
subscriptions.append(q)
try:
while True:
result = q.get()
ev = ServerSentEvent(str(result))
yield ev.encode()
except GeneratorExit: # Or maybe use flask signals
subscriptions.remove(q)
return Response(gen(), mimetype="text/event-stream")
if __name__ == "__main__":
app.debug = True
server = WSGIServer(("", 5000), app)
server.serve_forever()
# Then visit http://localhost:5000 to subscribe
# and send messages by visiting http://localhost:5000/publish
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment