Skip to content

Instantly share code, notes, and snippets.

@kharandziuk
Last active March 31, 2017 14:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kharandziuk/dfe74a6887ce3ec14c0daeba1d0a0182 to your computer and use it in GitHub Desktop.
Save kharandziuk/dfe74a6887ce3ec14c0daeba1d0a0182 to your computer and use it in GitHub Desktop.
Node.js Streams и реактивное программирование

В этой статье мы попробуем решить реальную проблему при помощи Node.js Stream и чуточку Reactive Programming. В последнем не уверен – RP, в какой-то мере, "жупел"(как перевести buzzword?) о котором все говорят, но никто не "делает". Статья рассматривает практический пример и ориентирована на знакомого с платформой читателя, по-этому намеренно не объясняет базовые понятия – если что-то непонятно по Stream API, то стоит обратится в документацию платформы или в какой-нибудь ее пересказ(например, этот).

Начнем с описания проблемы: нам нужно построить “паучка” который заберет все данные с “чужого” REST API, как-то их обработает и запишет в “нашу” базу данных. Для удобства воспроизведения и моделирования мы опустим некоторые детали о конкретном API и базе данных(в реальности это было API одного известного стартапа связанного с гостиницами и Postgres база данных). Представим что у нас есть две функции(код функций как и весь код из статьи можно найти тут):

getAPI(n, count) // функция псевдо-чтения из API. Возвращает нам promise который разрешится списком длинной count элементов начиная с n-го

insertDB(entries) // Функция псевдо-записи в базу данных. Возвращает promise который будет разрешен когда запись в базу выполнена

//Рассмотрим, пару примеров вызова этих функций:
getAPI(0, 2).then(console.log) // [{ id: 0}, {id: 1}]

getAPI(LAST_ITEM_ID, 1000).then(console.log) // [{id: LAST_ITEM_ID}] – отсюда вытекает важная особенность мы не можем просто узнать сколько сущностей содержит API.
// Максимальное значения для count равно 1000: если мы запросим 1001, то нам все равно вернется максимум  1000 сущностей

insertDB([{id: 0}]).then(console.log) // { count: 1 }

Мы намеренно проигнорируем обработку ошибок возможных при работе с API и базой, для простоты. Если возникнет интерес, то рассмотрим их в отдельной статье.

Ну и для того чтобы было не скучно скажем что наш заказчик извращенец и он поставил следующую задачу: мы не хотим видеть у себя в база все сущности id которых содержит число 3. А сущности id которых содержат число 9 хотим дополнить текущим значением timestamp: {id: 9} -> {id: 9, timestamp: 1490571732068}. Чуть притянуто за уши, но похоже на задачи обработки и фильтрации, которые приходится решать в подобных “пауках”.

Ну что же – начнем. Давайте попробуем решить данную задачу “в лоб”. Скорее всего мы закончим с кодом чем-то похожим на этот:

function grab(offset = 0, rows = 1000) {
  offset = offset
  return getAPI(offset, 1000).then((items) => {
      if(_.isEmpty(items)) {
        return
      } else {
        return insertDB(items).then(() => grab(offset + rows))
      }
    })
}

console.time('transition')
grab().then(() => {
  console.timeEnd('transition')
})

Что не так с данным кодом?

  1. Бегло прочитав код сложно понять что он делает. Это можно поправить добавив комментариев, но все же хотелось бы на уровне кода дать "читателю" понять что мы откуда-то читаем и куда-то пишем.
  2. Он слишком специфичный – представьте что мы добавим код обработти значений. Куда мы его добавим?
  3. Он рекурсивный – а значит при достаточно большом количестве сущностей в API мы получимм ошибку. Лечится переписыванием на do...while, но вряд-ли это сделает код более читаемым
  4. Он непроизводительный. Представим что наш источник данных данных работает намного быстрее чем потребитель – в этой ситуации нам бы хотелось аггрегировать данные в некий буфер и, по возможности, записывать их за один раз

Как вы уже догадались, данную задачу легко решить при помощи Streams. Для начала разобъем эту задачу на две подзадачи: чтение и запись.

Начнем с чтения, давайте попробуем выполнить наш ReadableStream:

const {Writable, Readable} = require('stream')
const {getAPI, insertDB} = require('./io-simulators')
const ROWS = 1000

class APIReadable extends Readable {
  constructor(options) {
    super({objectMode: true})
    this.offset = 0
  }

  _read(size) {
    getAPI(this.offset, ROWS).then(data => {
      if(_.isEmpty(data)) {
        this.push(null)
      } else {
        this.push(data)
      }
    })
    this.offset = this.offset + ROWS
  }
}

Выглядит чуть более громоздким. Стоит обратить внимание на objectMode: true – мы хотим оперировать объектами, а значит стоит передать этот флаг конструктору.

Окей, теперь перейдем к записи. Имплементируем наш Writable stream. Что-то вроде этого:

class DBWritable extends Writable {
  constructor(options) {
    super({highWaterMark: 5, objectMode: true});
  }

  _write(chunk, encoding, callback) {
    insertDB(chunk).asCallback(callback)
  }

  _writev(chunks, callback) {
    const entries = _.map(chunks, 'chunk')
    insertDB(_.flatten(entries)).asCallback(callback) // я использую Bluebird-promises, и вам рекомендую
  }
}

На что стоит обратить внимание:

  1. objectMode - как и с Readable stream мы хотим оперировать объектами, а не бинарными данными
  2. highWaterMark – размер нашего буфера. Тут стоит быть аккуратным, мы задаем размер буфера в объектах и это никак не связано с реальной размерностью(битами–байтами). Например: в нашем случае мы оперируем списками.
  3. _writev – опиcываейт как обрабовать несколько "кусков" данных из буфера за раз

Ну и теперь используем наш код вот так:

const dbWritable = new DBWritable()
const apiReadable= new APIReadable()

apiReadable.pipe(dbWritable)

Как мне кажется – это очень круто, теперь из кода предельно ясно что мы читаем из одного места и пишем в другое. Кроме того читатель может проверить что наш код работает очень эффективно и использует буфер. Ну и всякие мелкие плюшки вроде того что он не блокирует event-loop.

Хм –, спросит внимательный читатель, – а что же с обрабоктой данных? Для этого мы можем написать еще один Transform stream, но это как-то "плоско и скучно", по-этому мы используем библиотеку Highland.js которая позволит нам применить всеми любимые filter и map над эллементами нашего "потока" сущностей. Вообще, Highland это что-то больше чем этот простой usecase, но это тема отдельной и не маленькой статьи. Как-то так:

H(apiReadable)
  .flatten()
  .reject(x => _.includes(String(x.id), 3))
  .map(function(x) {
    if(_.includes(String(x.id), 9)) {
      return _.extend(x, {timestamp: Date.now()})
    } else {
      return x
    }
  })
  .batchWithTimeOrCount(100, 1000)
  .pipe(dbWritable)

Как по мне, очень похоже на операции со списками и читаемо. А .flatten() и .batchWithTimeOrCount(100, 1000) нужны нам только потому что наши Streamы оперирует массивами вместо отдельных объектов.

Вот сообственно и все. Надеюсь я достиг своей цели и заинтересовал читателя в изучении Stream и Highland.js

NB: Если вам понравилась статья перейдите по ссылке и прогосолуйте за мой доклад на Polyconf.

const H = require('highland')
const Promise = require('bluebird')
const _ = require('lodash')
const log = require('debug')('clawler')
const {Writable, Readable} = require('stream')
const {getAPI, insertDB} = require('./io-simulators')
const ROWS = 1000
class APIReadable extends Readable {
constructor(options) {
super({objectMode: true})
this.offset = 0
}
_read(size) {
getAPI(this.offset, ROWS).then(data => {
if(_.isEmpty(data)) {
this.push(null)
} else {
this.push(data)
}
})
this.offset = this.offset + ROWS
}
}
class DBWritable extends Writable {
constructor(options) {
super({highWaterMark: 5, objectMode: true});
}
_write(chunk, encoding, callback) {
insertDB(chunk).asCallback(callback)
}
_writev(chunks, callback) {
const entries = _.map(chunks, 'chunk')
insertDB(_.flatten(entries)).asCallback(callback)
}
}
const dbWritable = new DBWritable()
const apiReadable= new APIReadable()
H(apiReadable)
.flatten()
.reject(x => _.includes(String(x.id), 3))
.map(function(x) {
if(_.includes(String(x.id), 9)) {
return _.extend(x, {timestamp: Date.now()})
} else {
return x
}
})
.batchWithTimeOrCount(100, 1000)
.pipe(dbWritable)
const _ = require('lodash')
const Promise = require('bluebird')
const log = require('debug')('io-simulators')
const DELAY = 1
const MAX_ID = 5600// Completely random number:)
function getAPI(i, count) {
count = Math.min(count, 1000)
log('start reading', i)
return Promise.delay(DELAY).then(() => {
const range = _.range(i, i + count)
return _(range).reject(x => x > MAX_ID).map(x => ({id: x})).value()
})
.tap(entries => {
if(_.isEmpty(entries)) {
log(`---> read empty`)
} else {
log(`---> read from ${_.first(entries).id} till ${_.last(entries).id}`)
}
})
}
function insertDB(entries) {
log(`start write from ${_.first(entries).id} till ${_.last(entries).id}`)
return Promise.delay(DELAY * 5).tap(() => {
log(`<--- write from ${_.first(entries).id} till ${_.last(entries).id}`)
})
}
module.exports = {getAPI, insertDB}
const _ = require('lodash')
const log = require('debug')('clawler')
const {getAPI, insertDB} = require('./io-simulators')
function grab(offset = 0, rows = 1000) {
offset = offset
return getAPI(offset, 1000).then((items) => {
if(_.isEmpty(items)) {
return
} else {
return insertDB(items).then(() => grab(offset + rows))
}
})
}
console.time('transition')
grab().then(() => {
console.timeEnd('transition')
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment