Skip to content

Instantly share code, notes, and snippets.

@bobpace
Created March 18, 2015 23:35
Show Gist options
  • Save bobpace/c3b6d47e3a62aaea1169 to your computer and use it in GitHub Desktop.
Save bobpace/c3b6d47e3a62aaea1169 to your computer and use it in GitHub Desktop.
Koa.js server sent events with Rx Observables
module.exports = function eventStream() {
return function *(next) {
this.req.setTimeout(0); //no timeout
this.type ='text/event-stream; charset=utf-8';
this.set('Cache-Control', 'no-cache');
this.set('Connection', 'keep-alive');
this.set('Transfer-Encoding', 'chunked');
yield* next;
}
}
var Transform = require('stream').Transform;
var inherits = require('util').inherits;
var uuid = require('node-uuid');
var _ = require('lodash');
inherits(SseStream, Transform);
function SseStream(options) {
options = _.extend({}, options, {objectMode: true});
Transform.call(this, options);
}
SseStream.prototype._transform = function(data, enc, cb) {
var id = data.$id ? data.$id : uuid.v4();
var type = data.$type ? '/' + data.$type : '';
var payload = !_.isUndefined(data.$data) ? data.$data : _.omit(data, ['$type', '$data']);
this.push('id: ' + id + type + '\n');
this.push('data: ' + JSON.stringify(payload) + '\n\n');
cb();
}
module.exports = SseStream;
var co = require('co');
var Rx = require('rx');
var SseStream = require('../util/sse-stream');
var _ = require('lodash');
var compose = require('koa-compose');
var eventStream = require('./event-stream');
module.exports = function serverSentEvents(source) {
return compose([
eventStream(),
function *() {
var socket = this.socket;
var endOfSocket = Rx.Observable.merge(
Rx.Observable.fromEvent(socket, 'error'),
Rx.Observable.fromEvent(socket, 'close')
);
var body = this.body = new SseStream();
var observable = _.isFunction(source) ? source() : source;
observable.takeUntil(endOfSocket).subscribe(
(x) => body.write(x),
(err) => body.emit('error', err),
() => body.end()
);
}
]);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment