Skip to content

Instantly share code, notes, and snippets.

@wycats
Created October 26, 2013 03:50
Show Gist options
  • Save wycats/dc1400f7f54a6ae85a32 to your computer and use it in GitHub Desktop.
Save wycats/dc1400f7f54a6ae85a32 to your computer and use it in GitHub Desktop.
function pipe(readable, writable) {
var unsubscribe = readable.subscribe(next, writable.complete, writable.error);
function next(value) {
// the writable's `next` can return a promise; if it does, unsubscribe and wait
// for the promise to resolve before resubscribing
var pause = writable.next(value);
if (pause) {
unsubscribe();
pause.then(function() { pipe(readable, writable); })
}
}
}
class Writable {
next(value) {
// someAsyncProcessing returns a promise, like all well-behaved async things;
// the pipe will unsubscribe from the upstream until the promise resolves
return someAsyncProcessing(value);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment