Skip to content

Instantly share code, notes, and snippets.

@wearhere
Last active June 8, 2018 01:15
Show Gist options
  • Save wearhere/0afbbdfab7fee46c8aaba05d8d4470ed to your computer and use it in GitHub Desktop.
Save wearhere/0afbbdfab7fee46c8aaba05d8d4470ed to your computer and use it in GitHub Desktop.
Making SSE easy
import _ from 'underscore';
import $ from 'jquery';
import Backbone from 'backbone';
const DataCollection = Backbone.Collection.extend({
url() {
return '/api/data';
},
fetch() {
// HACK(jeff): No XHR to pass ah well.
this.trigger('request');
// We use an event source so we can stream back the results.
const evtSource = new EventSource(this.url());
const promise = new Promise((resolve, reject) => {
$(evtSource).on('message', ({ originalEvent: e }) => {
let data;
try {
{ data } = JSON.parse(e.data);
} catch (e) {
// ignore
}
if (!data) {
// Note that we need to close the source even if the server has closed the request
// as in the case of EOS, since the default browser behavior is to assume that the
// connection just dropped and try to reconnect after 3 seconds.
evtSource.close();
if (data === undefined) {
reject(_.extend(new Error('Invalid response'), {
raw: e.data
}));
} else if (data === null) {
// Expected EOS
this.trigger('sync', this, this.toJSON());
resolve();
}
return;
}
// This assumes that each instance of data should be used to hydrate a new Backbone model.
this.add(data);
}).on('error', ({ originalEvent: e }) => {
let error;
try {
error = JSON.parse(e.data);
} finally {
evtSource.close();
const err = _.extend(new Error('Stream error'), {
underlyingError: error
});
this.trigger('error', this, err);
reject(err);
}
});
});
promise.abort = () => {
evtSource.close();
// Consider the request to have finished.
this.trigger('sync', this, this.toJSON());
// ???(jeff): Should this `resolve` or `reject` the promise or just leave it hanging?
// It's just left hanging for now.
};
return promise;
}
});
export default DataCollection;
const _ = require('underscore');
function eventStreamMiddleware(writer) {
return [
// Open stream.
function(req, res, next) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
next();
},
// Write to stream.
function(req, res, next) {
writer(req, function({ event, id, data }) {
if (event) res.write(`event: ${event}\n`);
if (id) res.write(`id: ${id}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
}, next);
},
// End stream.
function(req, res) {
// EOS.
res.write(`data: ${JSON.stringify(null)}\n\n`);
// Note that this would cause an error to be emitted on the client
// if the client did not close the stream in response to EOS.
res.end();
},
// Handle errors and end stream.
function(err, req, res, next) {
res.write('event: error\n');
res.write(`data: ${JSON.stringify(_.pick(err, ['name', 'message', 'stack']))}\n\n`);
// No 500 status code since we already wrote the head.
res.end();
}
];
}
module.exports = eventStreamMiddleware;
const eventStreamMiddleware = require('./eventStreamMiddleware');
const express = require('express');
const DataFetcher = require('made-up-module');
const router = express.Router();
router.get('/data', eventStreamMiddleware(function(req, write, done) {
let fetcher = new DataFetcher();
.on('data', (data) => write({ data }));
.on('finish', done)
.run();
// Note that we don't do any error handling here since our made-up fetcher doesn't
// experience any fatal errors. Any non-fatal errors will get logged.
// If the fetcher just isn't working the user can stop it by cancelling the request.
req.on('close', () => {
// User cancelled or closed the page or connection dropped.
fetcher.abort();
});
}));
module.exports = router;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment