Skip to content

Instantly share code, notes, and snippets.

@pgk
Created November 10, 2014 00:15
Show Gist options
  • Save pgk/e5bea5a98c79e511fdd6 to your computer and use it in GitHub Desktop.
Save pgk/e5bea5a98c79e511fdd6 to your computer and use it in GitHub Desktop.
/*
* Streaming example
*/
var slice = Array.prototype.slice;
/*
* All streams respond to certain events
*/
var Event = function () {
this.events = {};
};
Event.prototype.on = function (evt, cb, ctx) {
if (!this.events[evt]) this.events[evt] = [];
if (ctx) cb = cb.bind(ctx);
this.events[evt].push(cb);
};
Event.prototype.off = function (evt) {
delete this.events[evt];
};
Event.prototype.emit = function (evt) {
var args = slice.call(arguments, 1);
(this.events[evt] || (this.events[evt] = [])).forEach(function (cb) {
cb.apply(null, args);
});
};
var STREAM_END = {};
var stream = function () {
var s = Object.create(Event); Event.call(s);
s.pipes = []; s.callables = [];
var callableHandler = function (fn) {
Event.call(this);
var that = this,
tid;
var loop = function loop() {
var next = fn();
if (next === STREAM_END) {
that.emit('end');
return;
}
that.emit('next', next);
tid = setTimeout(loop, 0);
};
that.on('init', function () { setTimeout(loop, 0); });
};
callableHandler.prototype = new Event();
var consumer = function (fn) {
Event.call(this);
this.on('next', function (args) { fn.apply(null, args); });
};
consumer.prototype = new Event();
s.append = function (cb) {
this.callables.push(new callableHandler(cb));
return this;
};
s.pipe = function (cb) {
this.pipes.push(new consumer(cb));
return this;
};
s.consume = function () {
var i = 0, that = this, tid, loopConsume;
loopConsume = function loopConsume() {
var strm = that.callables[i];
if (!strm) {
setTimeout(loopConsume, 0.01);
return;
}
strm.emit('init');
strm.on('next', function () {
var args = slice.call(arguments);
that.pipes.forEach(function (p) {
p.emit('next', args);
});
});
strm.on('end', function () {
i++;
tid = setTimeout(loopConsume, 0);
});
};
tid = setTimeout(loopConsume, 0);
};
return s;
};
stream.fromArray = function (array) {
var ry = array.concat([STREAM_END]), i = 0;
return function () {
var res = ry[i]; i++;
return res;
};
};
// -----------------------------------------------------------------------------
// EXAMPLES
// -----------------------------------------------------------------------------
var s = stream();
s.append( stream.fromArray([1, 2, 3, 4]) );
setTimeout(function () {
s.append( stream.fromArray([5, 6, 7, 8]) );
}, 500);
setTimeout(function () {
s.append( stream.fromArray([9, 10, 11, 12, 13, 14]) );
}, 1000);
s.pipe(function (result) {
console.log(result);
});
s.consume();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment