Skip to content

Instantly share code, notes, and snippets.

@cggaurav
Forked from grimen/streams-test.js
Created June 18, 2018 20:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cggaurav/a8a1309d3631dc8f7013b74b19f5d1b3 to your computer and use it in GitHub Desktop.
Save cggaurav/a8a1309d3631dc8f7013b74b19f5d1b3 to your computer and use it in GitHub Desktop.
Experiments with Node.js Stream API.
/* =============================================
Dependencies
------------------------------------------ */
const debug = require('debug')
const { Readable, Writable, Transform } = require('stream')
const JSONStream = require('JSONStream')
/* =============================================
Config
------------------------------------------ */
debug.enable('stream*')
/* =============================================
Readable Stream(s)
------------------------------------------ */
// Readable "buffer mode" stream - i.e. reads `Buffer` data
class BufferReadable extends Readable {
constructor (options = {}) {
super({})
this.count = options.from || 1
this.countMax = options.to || 100 * 1000
// this.log = debug('stream-test BufferReadable') // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_read () {
const object = {
count: this.count,
}
const text = JSON.stringify(object, null, '')
const buffer = Buffer.from(text, 'utf-8')
const encoding = 'buffer'
this.count += 1
setImmediate(() => {
if (this.count > this.countMax) {
return this.push(null)
}
this.log('READ', `'${encoding}'`, object, '=>', buffer)
this.push(buffer)
})
}
}
// Readable "object mode" stream - i.e. reads `Object` data
class ObjectReadable extends Readable {
constructor (options = {}) {
super({objectMode: true})
this.count = options.from || 1
this.countMax = options.to || 100 * 1000
// this.log = debug('stream-test ObjectReadable') // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_read () {
const encoding = null
const object = {
count: this.count,
}
this.count += 1
setImmediate(() => {
if (this.count > this.countMax) {
return this.push(null)
}
this.log('READ', 'object', object, '=>', object)
this.push(object)
})
}
}
/* =============================================
Writable Stream(s)
------------------------------------------ */
// Writable "buffer mode" stream - i.e. writes `Buffer` data
class BufferWritable extends Writable {
constructor (options = {}) {
super({})
this.parse = !!options.parse
// this.log = debug('stream-test BufferWritable') // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_write (chunk, encoding, callback) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding) // should be conditional
const string = buffer.toString('utf-8')
if (this.parse) {
let object
try {
object = JSON.parse(string || 'null')
} catch (error) {
object = error
}
this.log('WRITE', `'${encoding}'`, buffer, '=>', object)
} else {
this.log('WRITE', `'${encoding}'`, buffer, '=>', string)
}
// TODO: add back-pressure
callback()
}
}
// Writable "object mode" stream - i.e. writes `Object` data
class ObjectWritable extends Writable {
constructor (options = {}) {
super({objectMode: true})
// this.log = debug('stream-test ObjectWritable') // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_write (object, encoding, callback) {
this.log('WRITE', 'object', object, '=>', object)
// TODO: add back-pressure
callback()
}
}
/* =============================================
Transform Stream(s)
------------------------------------------ */
// Transform "buffer mode" stream - i.e. reads + writes `Buffer` data
class BufferTransform extends Transform {
constructor (options = {}) {
super({})
// this.log = debug('stream-test BufferTransform') // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_transform (chunk, encoding, callback) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding) // should be conditional
const string = buffer.toString('utf-8')
const transformedString = `{"transform stream was here": ${string}}` // for nicer test sticking to `JSON`-ish data, though very hacky
const transformedBuffer = Buffer.from(transformedString, 'utf-8')
this.buffer = transformedBuffer
this.log('TRANSFORM', `'${encoding}'`, buffer, '=>', string, '=>', transformedString, '=>', transformedBuffer)
this.push(this.buffer)
callback()
}
_flush (callback) {
this.log('FLUSH')
// do thing in this scenario, but if multiple `object`s are required to transform,
// then perform the `this.push(this.object)` here
callback()
}
}
// Transform "object mode" stream - i.e. reads + writes `Object` data
class ObjectTransform extends Transform {
constructor (options = {}) {
super({objectMode: true})
// this.log = debug('stream-test ObjectTransform') // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_transform (object, encoding, callback) {
const transformedObject = {
'transform stream was here': object,
}
this.log('TRANSFORM', 'object', object, '=>', transformedObject)
this.object = transformedObject
this.push(this.object)
callback()
}
_flush (callback) {
this.log('FLUSH')
// do thing in this scenario, but if multiple `object`s are required to transform,
// then perform the `this.push(this.object)` here
callback()
}
}
/* =============================================
Main
------------------------------------------ */
// 1. Buffer read/write/transform
const readableBufferStream = new BufferReadable()
const writableBufferStream = new BufferWritable()
const transformBufferStream = new BufferTransform()
// readableBufferStream.pipe(process.stdout) // will also work
// readableBufferStream.pipe(writableBufferStream)
// readableBufferStream.pipe(transformBufferStream).pipe(writableObjectStream)
// 2. Object read/write/transform
const readableObjectStream = new ObjectReadable()
const writableObjectStream = new ObjectWritable()
const transformObjectStream = new ObjectTransform()
// readableBufferStream.pipe(process.stdout) // will not work...
// readableBufferStream.pipe(JSONStream.stringify()).pipe(process.stdout) // ...unless piped to transform/serializer stream first
// readableObjectStream.pipe(writableObjectStream)
readableObjectStream.pipe(transformObjectStream).pipe(writableObjectStream)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment