-
-
Save cggaurav/a8a1309d3631dc8f7013b74b19f5d1b3 to your computer and use it in GitHub Desktop.
Experiments with Node.js Stream API.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* ============================================= | |
Dependencies | |
------------------------------------------ */ | |
const debug = require('debug') | |
const { Readable, Writable, Transform } = require('stream') | |
const JSONStream = require('JSONStream') | |
/* ============================================= | |
Config | |
------------------------------------------ */ | |
debug.enable('stream*') | |
/* ============================================= | |
Readable Stream(s) | |
------------------------------------------ */ | |
// Readable "buffer mode" stream - i.e. reads `Buffer` data | |
class BufferReadable extends Readable { | |
constructor (options = {}) { | |
super({}) | |
this.count = options.from || 1 | |
this.countMax = options.to || 100 * 1000 | |
// this.log = debug('stream-test BufferReadable') // much slower | |
this.log = (...args) => { | |
return console.log(`[${this.constructor.name}]:`, ...args) | |
} | |
} | |
_read () { | |
const object = { | |
count: this.count, | |
} | |
const text = JSON.stringify(object, null, '') | |
const buffer = Buffer.from(text, 'utf-8') | |
const encoding = 'buffer' | |
this.count += 1 | |
setImmediate(() => { | |
if (this.count > this.countMax) { | |
return this.push(null) | |
} | |
this.log('READ', `'${encoding}'`, object, '=>', buffer) | |
this.push(buffer) | |
}) | |
} | |
} | |
// Readable "object mode" stream - i.e. reads `Object` data | |
class ObjectReadable extends Readable { | |
constructor (options = {}) { | |
super({objectMode: true}) | |
this.count = options.from || 1 | |
this.countMax = options.to || 100 * 1000 | |
// this.log = debug('stream-test ObjectReadable') // much slower | |
this.log = (...args) => { | |
return console.log(`[${this.constructor.name}]:`, ...args) | |
} | |
} | |
_read () { | |
const encoding = null | |
const object = { | |
count: this.count, | |
} | |
this.count += 1 | |
setImmediate(() => { | |
if (this.count > this.countMax) { | |
return this.push(null) | |
} | |
this.log('READ', 'object', object, '=>', object) | |
this.push(object) | |
}) | |
} | |
} | |
/* ============================================= | |
Writable Stream(s) | |
------------------------------------------ */ | |
// Writable "buffer mode" stream - i.e. writes `Buffer` data | |
class BufferWritable extends Writable { | |
constructor (options = {}) { | |
super({}) | |
this.parse = !!options.parse | |
// this.log = debug('stream-test BufferWritable') // much slower | |
this.log = (...args) => { | |
return console.log(`[${this.constructor.name}]:`, ...args) | |
} | |
} | |
_write (chunk, encoding, callback) { | |
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding) // should be conditional | |
const string = buffer.toString('utf-8') | |
if (this.parse) { | |
let object | |
try { | |
object = JSON.parse(string || 'null') | |
} catch (error) { | |
object = error | |
} | |
this.log('WRITE', `'${encoding}'`, buffer, '=>', object) | |
} else { | |
this.log('WRITE', `'${encoding}'`, buffer, '=>', string) | |
} | |
// TODO: add back-pressure | |
callback() | |
} | |
} | |
// Writable "object mode" stream - i.e. writes `Object` data | |
class ObjectWritable extends Writable { | |
constructor (options = {}) { | |
super({objectMode: true}) | |
// this.log = debug('stream-test ObjectWritable') // much slower | |
this.log = (...args) => { | |
return console.log(`[${this.constructor.name}]:`, ...args) | |
} | |
} | |
_write (object, encoding, callback) { | |
this.log('WRITE', 'object', object, '=>', object) | |
// TODO: add back-pressure | |
callback() | |
} | |
} | |
/* ============================================= | |
Transform Stream(s) | |
------------------------------------------ */ | |
// Transform "buffer mode" stream - i.e. reads + writes `Buffer` data | |
class BufferTransform extends Transform { | |
constructor (options = {}) { | |
super({}) | |
// this.log = debug('stream-test BufferTransform') // much slower | |
this.log = (...args) => { | |
return console.log(`[${this.constructor.name}]:`, ...args) | |
} | |
} | |
_transform (chunk, encoding, callback) { | |
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding) // should be conditional | |
const string = buffer.toString('utf-8') | |
const transformedString = `{"transform stream was here": ${string}}` // for nicer test sticking to `JSON`-ish data, though very hacky | |
const transformedBuffer = Buffer.from(transformedString, 'utf-8') | |
this.buffer = transformedBuffer | |
this.log('TRANSFORM', `'${encoding}'`, buffer, '=>', string, '=>', transformedString, '=>', transformedBuffer) | |
this.push(this.buffer) | |
callback() | |
} | |
_flush (callback) { | |
this.log('FLUSH') | |
// do thing in this scenario, but if multiple `object`s are required to transform, | |
// then perform the `this.push(this.object)` here | |
callback() | |
} | |
} | |
// Transform "object mode" stream - i.e. reads + writes `Object` data | |
class ObjectTransform extends Transform { | |
constructor (options = {}) { | |
super({objectMode: true}) | |
// this.log = debug('stream-test ObjectTransform') // much slower | |
this.log = (...args) => { | |
return console.log(`[${this.constructor.name}]:`, ...args) | |
} | |
} | |
_transform (object, encoding, callback) { | |
const transformedObject = { | |
'transform stream was here': object, | |
} | |
this.log('TRANSFORM', 'object', object, '=>', transformedObject) | |
this.object = transformedObject | |
this.push(this.object) | |
callback() | |
} | |
_flush (callback) { | |
this.log('FLUSH') | |
// do thing in this scenario, but if multiple `object`s are required to transform, | |
// then perform the `this.push(this.object)` here | |
callback() | |
} | |
} | |
/* ============================================= | |
Main | |
------------------------------------------ */ | |
// 1. Buffer read/write/transform | |
const readableBufferStream = new BufferReadable() | |
const writableBufferStream = new BufferWritable() | |
const transformBufferStream = new BufferTransform() | |
// readableBufferStream.pipe(process.stdout) // will also work | |
// readableBufferStream.pipe(writableBufferStream) | |
// readableBufferStream.pipe(transformBufferStream).pipe(writableObjectStream) | |
// 2. Object read/write/transform | |
const readableObjectStream = new ObjectReadable() | |
const writableObjectStream = new ObjectWritable() | |
const transformObjectStream = new ObjectTransform() | |
// readableBufferStream.pipe(process.stdout) // will not work... | |
// readableBufferStream.pipe(JSONStream.stringify()).pipe(process.stdout) // ...unless piped to transform/serializer stream first | |
// readableObjectStream.pipe(writableObjectStream) | |
readableObjectStream.pipe(transformObjectStream).pipe(writableObjectStream) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment