Skip to content

Instantly share code, notes, and snippets.

@kharandziuk
Last active March 2, 2021 03:41
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kharandziuk/e823707bf71fba9a4cdf944216773f58 to your computer and use it in GitHub Desktop.
Save kharandziuk/e823707bf71fba9a4cdf944216773f58 to your computer and use it in GitHub Desktop.
Node.js Streams and Reactive Programming Primer

This article shows how to apply Node.js Stream and a bit of Reactive programming to a real(tm) problem. The article is intended to be highly practical and oriented for an intermediate reader. I intentionally omit some basic explanations. If you miss something try to check the API documentation or its retelling(e.g.: this one)

So, lets start from the problem description. We need to implement a simple web scraper which grabs all the data from some REST API, process the data somehow and inserts into our Database. For simplicity, I omit the details about the actual database and REST API(in real life it was the API of some travel fare aggregator website and a Pg database)

Consider we have two functions(code of the IO simulator functions and the other article code is here):

getAPI(n, count) // pseudo API call. returns a promise which resolves with a list of entities with length count starting from n-th
insertDB(entries) // pseudo DB insert. returns Возвращает promise который будет разрешен когда запись в базу выполнена

// and some examples of calls:
getAPI(0, 2).then(console.log) // [{ id: 0}, {id: 1}]

getAPI(LAST_ITEM_ID, 1000).then(console.log) // [{id: LAST_ITEM_ID}] – here implies one important fact, there is no easy way to count the entities in the API
// Max value for the `count` argument is 1000. if we ask more the function returns 1000

insertDB([{id: 0}]).then(console.log) // { count: 1 }

For this article we will ignore the errors which can occur. Maybe I will describe it in next article.

Also there are some requirements about the data processing. Let's say that we need to remove all the items which id contains number 3. And we need to extend with current timestamp all the items which id contains number 9 (e.g.: {id: 9} -> {id: 9, timestamp: 1490571732068}). These requirements are silly but similar to real ones which appear in real web scrapers

Time to get hands dirty! Some naive implementation can look like this:

function grab(offset = 0, rows = 1000) {
  offset = offset
  return getAPI(offset, 1000).then((items) => {
      if(_.isEmpty(items)) {
        return
      } else {
        return insertDB(items).then(() => grab(offset + rows))
      }
    })
}

console.time('transition')
grab().then(() => {
  console.timeEnd('transition')
})

What are the problems with such code?

  1. It's hard to understand what does the code at a first glance. We can improve it some comments, but it's better to show in the code that we are reading the in one place and writing it in the other.
  2. it's to specific. it's hard to write some logic for processing the values. How can we separate the processing logic from the I/O-logic?
  3. it's recursive. The maximum call stack size depends on the environment but in some cases it can be a problem. We can fix with e.g. iterative approach but it'll make it even less readable.
  4. it isn't effective. Our producer waits until the consumer use a portion of data instead of buffering some portions and writing them at once.

So, you may already guess that there is a better way to solve this problem with Node Streams. Let's split out problem for three: input, output and processing.

So, our Readable Stream can look like this:

const {Writable, Readable} = require('stream')
const {getAPI, insertDB} = require('./io-simulators')
const ROWS = 1000

class APIReadable extends Readable {
  constructor(options) {
    super({objectMode: true})
    this.offset = 0
  }

  _read(size) {
    getAPI(this.offset, ROWS).then(data => {
      if(_.isEmpty(data)) {
        this.push(null)
      } else {
        this.push(data)
      }
    })
    this.offset = this.offset + ROWS
  }
}

Looks a little bit heavy, but it's a standard interface. As a chunk of data we will use a list of thousand items. The important thing here is objectMode: true – we want to operate with objects instead of binary data.

Now the output part. We need to implement the Writable Stream. Something like this:

class DBWritable extends Writable {
  constructor(options) {
    super({highWaterMark: 5, objectMode: true});
  }

  _write(chunk, encoding, callback) {
    insertDB(chunk).asCallback(callback)
  }

  _writev(chunks, callback) {
    const entries = _.map(chunks, 'chunk')
    insertDB(_.flatten(entries)).asCallback(callback) // I'm using bluebird-promises
  }
}

Some important bits here:

  1. objectMode - we want to operate with objects
  2. highWaterMark – the size of our buffer... in objects. You should be careful with it because there is no direct conection between quantity and real size in bytes. In our case one object is a list of some size
  3. _writev – 'explains' how should it write more than one chunk from a buffer at one time.

And now we can connect them:

const dbWritable = new DBWritable()
const apiReadable = new APIReadable()

apiReadable.pipe(dbWritable)

From my perspective the code doesn't require any comments. We show the data flow and hide the details of implementation until you ask. Also, it's definitely effective:

  1. it doesn't block event-loop
  2. it's extensively use standard library primitives. They should be effective
  3. it uses buffer and back-presure

And for the dessert we will implement data processing. We can manually implement a Transform stream, but there is no fun with it. We will use a library called Highland.js which gives us an ability to use well known functional programming primitives(.filter, .map, etc.) with our streams. Actually, Highland.js is much bigger than just .map and .filter but I don't want to mute the scope of the article. So, transformation can look like this:

H(apiReadable)
  .flatten()
  .reject(x => _.includes(String(x.id), 3))
  .map(function(x) {
    if(_.includes(String(x.id), 9)) {
      return _.extend(x, {timestamp: Date.now()})
    } else {
      return x
    }
  })
  .batchWithTimeOrCount(100, 1000)
  .pipe(dbWritable)

Much the same as with simple JS lists. We need .flatten() and .batchWithTimeOrCount(100, 1000) because our streams operate on list of items instead of individual items.

So, that's all. I hope this article gives you some motivation to learn Node Stream and Highland.js

const H = require('highland')
const Promise = require('bluebird')
const _ = require('lodash')
const log = require('debug')('clawler')
const {Writable, Readable} = require('stream')
const {getAPI, insertDB} = require('./io-simulators')
const ROWS = 1000
class APIReadable extends Readable {
constructor(options) {
super({objectMode: true})
this.offset = 0
}
_read(size) {
getAPI(this.offset, ROWS).then(data => {
if(_.isEmpty(data)) {
this.push(null)
} else {
this.push(data)
}
})
this.offset = this.offset + ROWS
}
}
class DBWritable extends Writable {
constructor(options) {
super({highWaterMark: 5, objectMode: true});
}
_write(chunk, encoding, callback) {
insertDB(chunk).asCallback(callback)
}
_writev(chunks, callback) {
const entries = _.map(chunks, 'chunk')
insertDB(_.flatten(entries)).asCallback(callback)
}
}
const dbWritable = new DBWritable()
const apiReadable= new APIReadable()
H(apiReadable)
.flatten()
.reject(x => _.includes(String(x.id), 3))
.map(function(x) {
if(_.includes(String(x.id), 9)) {
return _.extend(x, {timestamp: Date.now()})
} else {
return x
}
})
.batchWithTimeOrCount(100, 1000)
.pipe(dbWritable)
const _ = require('lodash')
const Promise = require('bluebird')
const log = require('debug')('io-simulators')
const DELAY = 1
const MAX_ID = 5600// Completely random number:)
function getAPI(i, count) {
count = Math.min(count, 1000)
log('start reading', i)
return Promise.delay(DELAY).then(() => {
const range = _.range(i, i + count)
return _(range).reject(x => x > MAX_ID).map(x => ({id: x})).value()
})
.tap(entries => {
if(_.isEmpty(entries)) {
log(`---> read empty`)
} else {
log(`---> read from ${_.first(entries).id} till ${_.last(entries).id}`)
}
})
}
function insertDB(entries) {
log(`start write from ${_.first(entries).id} till ${_.last(entries).id}`)
return Promise.delay(DELAY * 5).tap(() => {
log(`<--- write from ${_.first(entries).id} till ${_.last(entries).id}`)
})
}
module.exports = {getAPI, insertDB}
const _ = require('lodash')
const log = require('debug')('clawler')
const {getAPI, insertDB} = require('./io-simulators')
function grab(offset = 0, rows = 1000) {
offset = offset
return getAPI(offset, 1000).then((items) => {
if(_.isEmpty(items)) {
return
} else {
return insertDB(items).then(() => grab(offset + rows))
}
})
}
console.time('transition')
grab().then(() => {
console.timeEnd('transition')
})
@ivan-kleshnin
Copy link

ivan-kleshnin commented Apr 10, 2017

Looks a little bit heavy, but it's a standard interface.

No inheritance and this refs, basic closure:

function APIReadable() {
  let offset = 0
  return new Readable({
    objectMode: true,
    read() { // -- no underscore --
      getAPI(this.offset, ROWS).then(data => {
        if(_.isEmpty(data)) {
          this.push(null)
        } else {
          this.push(data)
        }
      })
      offset = offset + ROWS
    } 
  })
}

@kharandziuk
Copy link
Author

cool, thx. Also check http://highlandjs.org/#_(source) generators, it can be even shorter way to create a stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment