Skip to content

Instantly share code, notes, and snippets.

@kaosat-dev
Last active August 29, 2015 14:21
Show Gist options
  • Save kaosat-dev/bf21816d9e6f1dae218f to your computer and use it in GitHub Desktop.
Save kaosat-dev/bf21816d9e6f1dae218f to your computer and use it in GitHub Desktop.
// ES7 Observables + WHATWG Streams
//
// https://github.com/jhusain/asyncgenerator
// https://github.com/whatwg/streams
//
// Continuation from file:
// https://github.com/jhusain/asyncgenerator/blob/master/src/observable.js
Observable.fromStream = function(readable) {
return new Observable(function(generator) {
var done = false;
var decoratedGenerator = decorate(generator, function() {
if (!done) {
done = true;
readable.cancel();
}
});
function pump() {
var writableReady;
// Check done before each read, observer may have canceled on us
if (done) {
readable.cancel();
return;
}
if (readable.state === "readable") {
// observer may signal backpressure by returning a promise
writableReady = decoratedGenerator.next(readable.read());
}
if (readable.state === "closed") {
done = true;
decoratedGenerator.return();
} else {
// Wait until both reader and writer are ready
Promise.all([readable.ready, writableReady]).then(pump, function(e) {
done = true;
readable.cancel();
decoratedGenerator.throw(e);
});
}
}
pump();
return decoratedGenerator;
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment