Skip to content

Instantly share code, notes, and snippets.

@juanmaguitar
Forked from joyrexus/README.md
Last active February 15, 2017 08:49
Show Gist options
  • Save juanmaguitar/75a007ea2343f79068996bfbe7dcf47a to your computer and use it in GitHub Desktop.
Save juanmaguitar/75a007ea2343f79068996bfbe7dcf47a to your computer and use it in GitHub Desktop.
Node.js streams demystified

A quick overview of the node.js streams interface with basic examples.

This is based on @brycebaril's presentation, Node.js Streams2 Demystified

Overview

Streams are a first-class construct in Node.js for handling data.

Think of them as as lazy evaluation applied to data.

There are essentially three major concepts:

  • source - where the data comes from
  • pipeline - where you filter or transform your data as it passes through
  • sink - where your data ultimately goes

Benefits in using streams:

  • lazily produce or consume data in buffered chunks
  • evented and non-blocking
  • low memory footprint
  • automatically handle back-pressure
  • buffers allow you to work around the v8 heap memory limit
  • most core node.js content sources/sinks are streams already!

Five classes of streams:

  • Readable - sources
  • Writable - sinks
  • Duplex - both source and sink
  • Transform - in-flight stream operations
  • Passthrough - stream spy

Below is a quick overview of Readable, Writable, and Transform streams.

See also:


Readable

Use a Readable stream when supplying data as a stream.

Think: spigot/faucet.

How to implement

  1. Subclass stream.Readable.

  2. Implement a _read(size) method.

Methods

_read(size)

  • size is in bytes, but can be ignored (especially for objectMode streams)
  • _read(size) must call this.push(chunk) to send a chunk to the consumer

Options

  • highWaterMark number: maximum number of bytes to store in the internal buffer before ceasing to read (default: 16kb)

  • encoding string: if set, buffers will be decoded to strings instead of passing buffers (default: null)

  • objectmode boolean: instead of using buffers/strings, use javascript objects (default: false)

How to use

  • readable.pipe(target)
  • readable.read(size)
  • readable.on("data", ... )

See also

  • stream-spigot - creates readable streams from Arrays or simple functions

Writable

Use a Writable stream when collecting data from a stream.

Think: drain/collect.

How to implement

  1. Subclass stream.Writable.

  2. Implement a _write(chunk, encoding, callback) method.

Methods

_write(chunk, encoding, callback)

  • chunk is the content to write
  • Call callback() when you're done with this chunk

Options

  • highWaterMark number: maximum number of bytes to store in the internal buffer before ceasing to read (default: 16kb)

  • decodeStrings boolean: whether to decode strings to Buffers before passing them to _write() (default: true)

How to use

  • source.pipe(sink)
  • writable.write(chunk [,encoding] [,callback])

See also

  • concat-stream - writable stream that concatenates strings or binary data and calls a callback with the result

Transform

Use a Transform stream when you want to operate on a stream in transit. This is a special kind of Duplex stream where the input and output stream are the same stream.

Think: filter/map.

How to implement

  1. Subclass stream.Transform.
  2. Implement a _transform(chunk, encoding, callback) method.
  3. Optionally implement a _flush(callback) method.

Methods

_transform(chunk, encoding, callback)

Call this.push(something) to forward it to the next consumer. You don't have to push anything, this will skip a chunk. You must call callback one time per _transform call.

_flush(callback)

When the stream ends, this is your chance to do any cleanup or last-minute this.push() calls to clear any buffers or work. Call callback() when done.

Options

Superset of Readable and Writable options.

How to use

  • source.pipe(transform).pipe(drain)
  • transform.on("data", ... )

See also

  • through2 - makes it easy to generate Transforms without all the subclassing boilerplate
  • through2-map - Array.prototype.map analog for streams
  • through2-filter - Array.prototype.filter analog for streams
  • through2-reduce - Array.prototype.reduce analog for streams
  • stream reducer demo - showing how to extend a Transform stream to create reducers/accumulators for streamed objects
  • sculpt - a collection of transform stream utilities (all operating in objectMode)
  • pipe-iterators - another collection of functions for iterating over object mode streams
Readable = require("stream").Readable
class Source extends Readable {
constructor(content, options) {
super(options)
this.content = content
}
_read(size) {
if (!this.content) this.push(null)
else {
this.push( this.content.slice(0, size) )
this.content = this.content.slice(size)
}
}
}
s = new Source("The quick brown fox jumps over the lazy dog.")
console.log( s.read(10).toString() )
console.log( s.read(10).toString() )
console.log( s.read(10).toString() )
console.log( s.read(10).toString() )
console.log( s.read(10).toString() )
// The quick
// brown fox
// jumps over
// the lazy
// dog.
var q = new Source("How now brown cow?")
q.pipe(process.stdout)
// How now brown cow?
var Readable = require("stream").Readable
var inherits = require("util").inherits
function Source(content, options) {
Readable.call(this, options)
this.content = content
}
inherits(Source, Readable)
Source.prototype._read = function (size) {
if (!this.content) this.push(null)
else {
this.push(this.content.slice(0, size))
this.content = this.content.slice(size)
}
}
var s = new Source("The quick brown fox jumps over the lazy dog.")
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
// The quick
// brown fox
// jumps over
// the lazy
// dog.
var q = new Source("How now brown cow?")
q.pipe(process.stdout)
// How now brown cow?
var Transform = require("stream").Transform
class ToUpper extends Transform {
constructor(options) {
super(options)
}
_transform (chunk, encoding, callback) {
var str = chunk.toString().toUpperCase()
this.push(str)
callback()
}
}
// a simple transform stream
var tx = new ToUpper;
// a simple source stream
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('the quick brown fox ');
rs.push('jumps over the lazy dog.\n');
rs.push(null);
rs.pipe(tx).pipe(process.stdout);
// THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
var Transform = require("stream").Transform
var inherits = require("util").inherits
function ToUpper (options) {
Transform.call(this, options)
}
inherits(ToUpper, Transform)
ToUpper.prototype._transform = function (chunk, encoding, callback) {
var str = chunk.toString().toUpperCase()
this.push(str)
callback()
}
// a simple transform stream
var tx = new ToUpper;
// a simple source stream
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('the quick brown fox ');
rs.push('jumps over the lazy dog.\n');
rs.push(null);
rs.pipe(tx).pipe(process.stdout);
// THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
const Writable = require('stream').Writable
class Sink extends Writable {
constructor(options) {
super(options)
}
_write(data, enc, next) {
console.log( data.toString() )
next()
}
}
// a simple source stream
const Readable = require('stream').Readable
var source = new Readable;
source.push('the quick brown fox ');
source.push('jumps over the lazy dog.\n');
source.push(null);
var sink = new Sink;
sink.on("finish",function() {
console.log("writing finished!")
})
source.pipe(sink);
var Writable = require("stream").Writable
var inherits = require("util").inherits
function Sink(options) {
Writable.call(this, options)
}
inherits(Sink, Writable)
Sink.prototype._write = function (chunk, encoding, callback) {
console.log(chunk.toString())
callback()
}
// a simple source stream
var Readable = require('stream').Readable;
var source = new Readable;
source.push('the quick brown fox ');
source.push('jumps over the lazy dog.\n');
source.push(null);
var sink = new Sink;
source.pipe(sink);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment