Skip to content

Instantly share code, notes, and snippets.

@RReverser
Last active August 29, 2015 14:03
Show Gist options
  • Save RReverser/9888bc36f432f5a5547f to your computer and use it in GitHub Desktop.
Save RReverser/9888bc36f432f5a5547f to your computer and use it in GitHub Desktop.
var stream = require('stream');
var Rx = require('rx');
var util = require('util');
function ObservableStream(transform) {
stream.Duplex.call(this, {objectMode: true});
this.subject = new Rx.Subject();
this.observable = transform(this.subject);
this.subscription = null;
}
util.inherits(ObservableStream, stream.Duplex);
ObservableStream.prototype._read = function () {
if (this.subscription) {
return;
}
this.subscription = this.observable.subscribe(
this.push.bind(this),
this.emit.bind(this, 'error'),
this.push.bind(this, null)
);
};
ObservableStream.prototype._write = function (value, enc, callback) {
this.subject.onNext(value);
callback();
};
ObservableStream.prototype.end = function () {
this.subject.onCompleted();
this.subject.dispose();
};
var inputStream = (function (values) {
var inputStream = new stream.Readable({objectMode: true});
inputStream._read = function () {
this.push(values.shift());
};
return inputStream;
})(['1', '2', '3']);
var observableStream = new ObservableStream(function (observable) {
return observable
.concat(Rx.Observable.return('Done'))
.map(function (ch) { return '!' + ch + '\n' });
});
inputStream.pipe(observableStream).pipe(process.stdout);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment