Skip to content

Instantly share code, notes, and snippets.

@grimen
Last active January 23, 2019 09:41
Show Gist options
  • Save grimen/765ec3851d0529178fd8785b82a07c51 to your computer and use it in GitHub Desktop.
Save grimen/765ec3851d0529178fd8785b82a07c51 to your computer and use it in GitHub Desktop.
Experiments with Node.js Stream API.
/* =============================================
Dependencies
------------------------------------------ */
const debug = require('debug')
const chalk = require('chalk')
const { Readable, Writable, Transform, pipeline } = require('stream')
const JSONStream = require('JSONStream')
/* =============================================
Config
------------------------------------------ */
debug.enable('stream*')
/* =============================================
Readable Stream(s)
------------------------------------------ */
// Readable "buffer mode" stream - i.e. reads `Buffer` data
class BufferReadable extends Readable {
constructor (options = {}) {
super({
// we are only processing max N bytes at a time - in object mode.
highWaterMark: options.highWaterMark || 16 * 1024, // 16 kB (default) - @see https://github.com/nodejs/node-v0.x-archive/blob/master/lib/_stream_readable.js#L49
})
this.count = options.from || 1
this.countMax = options.to || 100 * 1000
// this.log = debug(`stream-test ${this.constructor.name}`) // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_read () {
const object = {
count: this.count,
}
const text = JSON.stringify(object, null, '')
const buffer = Buffer.from(text, 'utf-8')
const encoding = 'buffer'
this.count += 1
setImmediate(() => {
if (this.count > this.countMax) {
return this.push(null)
}
this.log('READ', `'${encoding}'`, object, '=>', buffer)
this.push(buffer)
})
}
}
// Readable "object mode" stream - i.e. reads `Object` data
class ObjectReadable extends Readable {
constructor (options = {}) {
super({
// this stream is in "object mode", meaning we expect an
// entire object to come through the stream at a time.
objectMode: true,
// we are only processing max N objects at a time - in object mode.
highWaterMark: options.highWaterMark || 16, // 16 objects (default) - @see https://github.com/nodejs/node-v0.x-archive/blob/master/lib/_stream_readable.js#L49
})
this.count = options.from || 1
this.countMax = options.to || 100 * 1000
// this.log = debug(`stream-test ${this.constructor.name}`) // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_read () {
const encoding = null
const object = {
count: this.count,
}
this.count += 1
setImmediate(() => {
if (this.count > this.countMax) {
return this.push(null)
}
this.log('READ', 'object', object, '=>', object)
this.push(object)
})
}
}
/* =============================================
Transform Stream(s)
------------------------------------------ */
// Transform "buffer mode" stream - i.e. reads + writes `Buffer` data
class BufferTransform extends Transform {
constructor (options = {}) {
super({
// we are only processing max N bytes at a time - in object mode.
highWaterMark: options.highWaterMark || 16 * 1024, // 16 kB (default) - @see https://github.com/nodejs/node-v0.x-archive/blob/master/lib/_stream_readable.js#L49
})
// this.log = debug(`stream-test ${this.constructor.name}`) // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_transform (chunk, encoding, callback) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding) // should be conditional
const string = buffer.toString('utf-8')
const transformedString = `{"transform stream was here": ${string}}` // for nicer test sticking to `JSON`-ish data, though very hacky
const transformedBuffer = Buffer.from(transformedString, 'utf-8')
this.buffer = transformedBuffer
this.log('TRANSFORM', `'${encoding}'`, buffer, '=>', string, '=>', transformedString, '=>', transformedBuffer)
this.push(this.buffer)
callback()
}
_flush (callback) {
this.log('FLUSH')
// do thing in this scenario, but if multiple `object`s are required to transform,
// then perform the `this.push(this.object)` here
callback()
}
}
// Transform "object mode" stream - i.e. reads + writes `Object` data
class ObjectTransform extends Transform {
constructor (options = {}) {
super({
// this stream is in "object mode", meaning we expect an
// entire object to come through the stream at a time.
objectMode: true,
// we are only processing max N objects at a time - in object mode.
highWaterMark: options.highWaterMark || 16, // 16 objects (default) - @see https://github.com/nodejs/node-v0.x-archive/blob/master/lib/_stream_readable.js#L49
})
// this.log = debug(`stream-test ${this.constructor.name}`) // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_transform (object, encoding, callback) {
const transformedObject = {
'transform stream was here': object,
}
this.log('TRANSFORM', 'object', object, '=>', transformedObject)
this.object = transformedObject
this.push(this.object)
callback()
}
_flush (callback) {
this.log('FLUSH')
// do thing in this scenario, but if multiple `object`s are required to transform,
// then perform the `this.push(this.object)` here
callback()
}
}
/* =============================================
Writable Stream(s)
------------------------------------------ */
// Writable "buffer mode" stream - i.e. writes `Buffer` data
class BufferWritable extends Writable {
constructor (options = {}) {
super({
// we are only processing max N bytes at a time - in object mode.
highWaterMark: options.highWaterMark || 16 * 1024, // 16 kB (default) - @see https://github.com/nodejs/node-v0.x-archive/blob/master/lib/_stream_writable.js#L58
})
this.parse = !!options.parse
// this.log = debug(`stream-test ${this.constructor.name}`) // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_write (chunk, encoding, callback) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding) // should be conditional
const string = buffer.toString('utf-8')
if (this.parse) {
let object
try {
object = JSON.parse(string || 'null')
} catch (error) {
object = error
}
this.log('WRITE', `'${encoding}'`, buffer, '=>', object)
} else {
this.log('WRITE', `'${encoding}'`, buffer, '=>', string)
}
// @see https://nodejs.org/en/docs/guides/backpressuring-in-streams
callback()
}
}
// Writable "object mode" stream - i.e. writes `Object` data
class ObjectWritable extends Writable {
constructor (options = {}) {
super({
// this stream is in "object mode", meaning we expect an
// entire object to come through the stream at a time.
objectMode: true,
// we are only processing max N objects at a time - in object mode.
highWaterMark: options.highWaterMark || 16, // 16 objects (default) - @see https://github.com/nodejs/node-v0.x-archive/blob/master/lib/_stream_writable.js#L58
})
// this.log = debug(`stream-test ${this.constructor.name}`) // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
}
_write (object, encoding, callback) {
this.log('WRITE', 'object', object, '=>', object)
// @see https://nodejs.org/en/docs/guides/backpressuring-in-streams
callback()
}
}
/* =============================================
Writable HTTP Stream(s)
------------------------------------------ */
class HTTPBufferWritable extends Writable {
constructor (options = {}) {
super({
// we are only processing max N bytes at a time - in object mode.
highWaterMark: options.highWaterMark || 16 * 1024, // 16 kB (default) - @see https://github.com/nodejs/node-v0.x-archive/blob/master/lib/_stream_writable.js#L58
})
this.parse = !!options.parse
// this.log = debug(`stream-test ${this.constructor.name}`) // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
this.uri = options.uri
this.request = require('request')
}
_write (chunk, encoding, callback) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding) // should be conditional
const string = buffer.toString('utf-8')
if (this.parse) {
let object
try {
object = JSON.parse(string || 'null')
} catch (error) {
object = error
}
this.log('WRITE', `'${encoding}'`, buffer, '=>', object)
this.request.post({
uri: this.uri,
headers: {
'Content-Type': 'application/json',
},
body: object,
json: true,
}, (err, res) => {
this.log('HTTP', (res.statusCode < 400) ? chalk.green(res.statusCode) : chalk.red(res.statusCode))
if (err) {
return callback(err)
}
callback()
})
} else {
this.log('WRITE', `'${encoding}'`, buffer, '=>', string)
this.request.post({
uri: this.uri,
headers: {
'Content-Type': 'text/plain',
},
body: string,
}, (err, res) => {
this.log('HTTP', (res.statusCode < 400) ? chalk.green(res.statusCode) : chalk.red(res.statusCode))
if (err) {
return callback(err)
}
callback()
})
}
}
}
class HTTPObjectWritable extends Writable {
constructor (options = {}) {
super({
// this stream is in "object mode", meaning we expect an
// entire object to come through the stream at a time.
objectMode: true,
// we are only processing max N bytes at a time - in object mode.
highWaterMark: options.highWaterMark || 16, // 16 objects (default) - @see https://github.com/nodejs/node-v0.x-archive/blob/master/lib/_stream_writable.js#L58
})
// this.log = debug(`stream-test ${this.constructor.name}`) // much slower
this.log = (...args) => {
return console.log(`[${this.constructor.name}]:`, ...args)
}
this.uri = options.uri
this.request = require('request')
}
_write (object, encoding, callback) {
this.log('WRITE', 'object', object, '=>', object)
// @see https://nodejs.org/en/docs/guides/backpressuring-in-streams
this.request.post({
uri: this.uri,
headers: {
'Content-Type': 'application/json',
},
body: object,
json: true,
}, (err, res) => {
this.log('HTTP', (res.statusCode < 400) ? chalk.green(res.statusCode) : chalk.red(res.statusCode))
if (err) {
// REVIEW: backpressure behaviour on error
return callback(err)
}
callback()
})
}
}
/* =============================================
Main
------------------------------------------ */
const express = require('express')
const expressBodyParser = require('body-parser')
const app = express()
process.env.PORT = process.env.PORT || 5000
app.use(expressBodyParser.json())
app.use('/', (req, res) => {
const data = (typeof req.body === 'string') ? req.body : JSON.stringify(req.body)
res.end(data)
})
app.listen(process.env.PORT, () => {
debug('stream test http')(`listening on ${process.env.PORT}`)
})
/* ------ */
// 1. Stream Pipe API: Buffer (read/write/transform)
const readableBufferStream = new BufferReadable({to: 1000}).on('end', process.exit)
const transformBufferStream = new BufferTransform()
const writableBufferStream = new BufferWritable()
// readableBufferStream.pipe(process.stdout) // will also work
// readableBufferStream.pipe(writableBufferStream)
// readableBufferStream.pipe(transformBufferStream).pipe(writableBufferStream)
// 2. Stream Pipe API: Object (read/write/transform)
const readableObjectStream = new ObjectReadable({to: 1000}).on('end', process.exit)
const transformObjectStream = new ObjectTransform()
const writableObjectStream = new ObjectWritable()
// readableBufferStream.pipe(process.stdout) // will not work...
// readableBufferStream.pipe(JSONStream.stringify()).pipe(process.stdout) // ...unless piped to transform/serializer stream first
// readableObjectStream.pipe(writableObjectStream)
// readableObjectStream.pipe(transformObjectStream).pipe(writableObjectStream)
// 3. Stream Pipe API: Buffer/Object network examples
const writableHTTPBufferStream = new HTTPBufferWritable({uri: `http://localhost:${process.env.PORT}/`})
const writableHTTPObjectStream = new HTTPObjectWritable({uri: `http://localhost:${process.env.PORT}/`})
// readableBufferStream.pipe(writableHTTPBufferStream)
readableObjectStream.pipe(writableHTTPObjectStream)
// 4. Stream Pipeline API
// pipeline(
// readableObjectStream,
// transformObjectStream,
// writableObjectStream,
// (err) => {
// if (err) {
// return console.error('Pipeline failed!', err)
// }
// console.log('Pipeline succeeded!')
// }
// )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment