Skip to content

Instantly share code, notes, and snippets.

@mrunderhill89
Last active August 29, 2015 14:16
Show Gist options
  • Save mrunderhill89/85b6ae9918c5c71e7f05 to your computer and use it in GitHub Desktop.
Save mrunderhill89/85b6ae9918c5c71e7f05 to your computer and use it in GitHub Desktop.
A simple experiment in parallelizing Bacon.js using Parallel.js. I haven't thoroughly debugged it yet, but it works most of the time.
var Bacon = require("baconjs").Bacon;
var Parallel = require('paralleljs');
var _ = require("underscore")._;
_.extend(Bacon.Observable.prototype, {
/*
Gathers events in a buffer for parallel processing.
Obviously, this needs to be done before any parallel operations are performed.
- length: the maximum number of entries collected before dumping. Set <= 0 for no limit.
- timeOut: will dump events after this many milliseconds, regardless of how many were collected. Set <= 0 for no limit.
- blocking: function that tests each value as it comes in. if true, it dumps all stored entries before sending the latest
one separately from the rest. Good for handling read/write operations.
*/
gather: function(length, timeOut, blocking){
return new Bacon.EventStream(function(sink){
var buf = [];
var timer_ID;
_.isNumber(length) || (length = 1);
_.isNumber(timeOut) || (timeOut = 100);
(length > 0 || timeOut > 0) || (length = 1);
function resetTimer(){
if (timer_ID){clearTimeout(timer_ID);}
if (timeOut > 0){
timer_ID = setTimeout(dump, timeOut);
}
};
function dump(){
if (buf.length > 0){
sink(new Bacon.Next(buf));
}
buf = [];
resetTimer();
};
resetTimer();
this.onValue(function(value){
if (_.isFunction(blocking) && blocking(value)){
dump();
sink(new Bacon.Next([value]));
} else {
buf.push(value);
if (length > 0 && buf.length >= length){
buf = buf.slice(0,length);
dump();
}
}
});
}.bind(this));
},
/*
Splits an array event into single events.
Dunno if Bacon already has this, but I'm defining it here just in case.
Note that parallelMap and parallelFilter already split the events for you.
*/
split: function(){
return new Bacon.EventStream(function(sink){
this.onValue(function(array){
if (_.isArray(array)){
_.each(array, function(value){
sink(new Bacon.Next(value));
});
} else {
sink(new Bacon.Next(array));
}
}.bind(this));
});
},
/*
Creates a Parallel.js object and feeds it values from the (hopefully prepared with .gather())
event stream.
- per_value: function that is run in parallel on every value as it's received.
Return value will alter the results.
- end: function that is run on the final results. Note that this function receives all of the results
in one array, so if you want separate values, you'll need to iterate using _.each() or similar.
- setup: used to prepare the internal Parallel.js object before running anything. Useful if you want to call
.require() or similar functions.
- returns: same as Bacon.onValue().
*/
onParallel: function(per_value, end, setup){
_.isFunction(setup) || (setup = function(p){return p});
var p = setup(new Parallel());
return this.onValue(function(data){
p.then(function(){return data})
.map(per_value)
.then(end);
});
},
/*
Parallel version of Bacon.map().
*/
parallelMap: function(fun){
return new Bacon.EventStream(function(sink){
this.onParallel(fun, function(results){
_.each(results, function(result){
if (!(_.isNull(result) || _.isUndefined(result))){
sink(new Bacon.Next(result));
} else {
sink(new Bacon.Error("Parallel Map: unexpected result:"+result);
}
});
return results;
});
}.bind(this));
},
/*
Parallel version of Bacon.filter().
*/
parallelFilter: function(test){
return new Bacon.EventStream(function(sink){
this.onParallel(
function(value){
return test(value)? value : null;
},
function(results){
_.each(results, function(result){
if (!_.isNull(result)){
sink(new Bacon.Next(result));
}
});
return results;
},
function(p){return p.require({fn:test, name:"test"})}
);
}.bind(this));
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment