Skip to content

Instantly share code, notes, and snippets.

@th-yoo
Last active March 22, 2017 12:10
Show Gist options
  • Save th-yoo/1e9766c22554b808e9d5785c450a916a to your computer and use it in GitHub Desktop.
Save th-yoo/1e9766c22554b808e9d5785c450a916a to your computer and use it in GitHub Desktop.
Promisify node stream
'use strict'
// (node -v) => 5.2.0
const stream = require('stream');
class StreamPromise {
constructor(stream) {
this._promises = [];
this._add(stream);
this._cur = stream;
// http://2ality.com/2014/10/es6-promises-api.html
// http://exploringjs.com/es6/ch_promises.html
if (!Promise.prototype.finally) {
Promise.prototype.finally = function (cb) {
const P = this.constructor;
return this.then(
value =>
P.resolve(cb()).then(()=>value),
reason =>
P.resolve(cb()).then(()=>{throw reason;})
);
}
}
}
// N.B, Do not pipe process.stdout nor stderr
// https://nodejs.org/api/process.html#process_a_note_on_process_i_o
// Both of them are unexpectedly derived from Readable
// and never emit 'finish'
pipe(stream) {
if (this._p) {
throw Error('Can not pipe after calling Promise API');
}
this._add(stream);
this._cur.pipe(stream);
this._cur = stream;
return this;
}
on(ev, cb) {
this._cur.on(ev, cb);
return this;
}
then(fulfilled, rejected) {
this._promise().then(fulfilled, rejected);
return this;
}
catch(rejected) {
this._promise().catch(rejected);
return this;
}
finally(cb) {
this._promise().finally(cb);
return this;
}
_promise() {
if (!this._p) {
if (this._promises.length == 0) {
this._p = Promise.reject(Error('No stream'));
}
else {
this._p = Promise.all(this._promises);
}
}
return this._p;
}
_add(s) {
let cur = new Promise((resolve, reject) => {
let success;
if (s instanceof stream.Writable) {
success = 'finish';
}
else if (s instanceof stream.Readable) {
success = 'end';
}
else {
throw Error('Unknown stream');
}
s.on(success, resolve).on('error', reject);
});
this._promises.push(cur);
}
};
if (require.main === module) {
class Stdout extends stream.Writable {
constructor() { super({objectMode: true}); }
_write(chunk, enc, cb) {
process.stdout.write(chunk);
cb();
}
};
const fs = require('fs');
let src = fs.createReadStream(require.main.filename);
src = new StreamPromise(src);
src.pipe(new Stdout)
.then(()=> { console.error('streaming succeeded'); })
.catch(err=> { console.error('streaming failed:', err); })
.finally(()=> { console.error('streaming ends'); });
}
else {
module.exports = StreamPromise;
}
'use strict'
// Single promise + onFullfilled, onRejected
const stream = require('stream');
class StreamPromise {
constructor(stream, resolve) {
this._streams = [];
this._add(stream, resolve);
this._cur = stream;
// http://2ality.com/2014/10/es6-promises-api.html
// http://exploringjs.com/es6/ch_promises.html
if (!Promise.prototype.finally) {
Promise.prototype.finally = function (cb) {
const P = this.constructor;
return this.then(
value =>
P.resolve(cb()).then(()=>value),
reason =>
P.resolve(cb()).then(()=>{throw reason;})
);
}
}
}
// N.B, Do not pipe process.stdout nor stderr
// https://nodejs.org/api/process.html#process_a_note_on_process_i_o
// Both of them are unexpectedly derived from Readable
// and never emit 'finish'
pipe(stream, resolve) {
this._add(stream, resolve);
this._cur.pipe(stream);
this._cur = stream;
return this;
}
on(ev, cb) {
this._cur.on(ev, cb);
return this;
}
then(fulfilled, rejected) {
return this._promise().then(fulfilled, rejected);
}
catch(rejected) {
return this._promise().catch(rejected);
}
finally(cb) {
return this._promise().finally(cb);
}
_promise() {
if (!this._p) {
function callback(s, resolve) {
if (!s.resolve) {
return resolve;
}
if (s.resolve instanceof Function) {
return ()=>resolve(s.resolve());
}
else {
return ()=>resolve(s.resolve);
}
}
this._p = new Promise((resolve, reject) => {
let last = this._streams[this._streams.length-1];
last.stream.on('finish', callback(last, resolve));
for (let s of this._streams) {
s.stream.on('error', reject);
};
});
}
return this._p;
}
_add(stream, resolve) {
this._streams.push({ stream: stream
, resolve: resolve });
}
};
if (require.main === module) {
class Stdout extends stream.Writable {
constructor() { super({objectMode: true}); }
_write(chunk, enc, cb) {
process.stdout.write(chunk);
cb();
}
};
const fs = require('fs');
let src = fs.createReadStream(require.main.filename);
src = new StreamPromise(src);
src.pipe(new Stdout, 'GOOD!!!')
.then( msg=> { console.error('streaming succeeded', msg); })
.catch(err=> { console.error('streaming failed:', err); })
.finally(()=> { console.error('streaming ends'); });
}
else {
module.exports = StreamPromise;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment