Skip to content

Instantly share code, notes, and snippets.

@markbirbeck
Created October 29, 2018 12:04
Show Gist options
  • Save markbirbeck/e4f78ecf31c84b045af338c39e7b0aec to your computer and use it in GitHub Desktop.
Save markbirbeck/e4f78ecf31c84b045af338c39e7b0aec to your computer and use it in GitHub Desktop.
const stream = require('stream')
class ElasticSearchWritableStream extends stream.Writable {
constructor(config, options) {
super(options)
this.config = config
/**
* Create the ElasticSearch client:
*/
this.client = new elasticsearch.Client({
host: this.config.host
})
}
_destroy() {
return this.client.close()
}
/**
* When writing a single record, we use the index() method of
* the ES API:
*/
async _write(body, enc, next) {
/**
* Push the object to ES and indicate that we are ready for the next one.
* Be sure to propagate any errors:
*/
try {
await this.client.index({
index: this.config.index,
type: this.config.type,
body
})
next()
} catch(err) {
next(err)
}
}
async _writev(chunks, next) {
const body = chunks
.map(chunk => chunk.chunk)
.reduce((arr, obj) => {
/**
* Each entry to the bulk API comprises an instruction (like 'index'
* or 'delete') and some data:
*/
arr.push({ index: { } })
arr.push(obj)
return arr
}, [])
/**
* Push the array of actions to ES and indicate that we are ready
* for more data. Be sure to propagate any errors:
*/
try {
await this.client.bulk({
index: this.config.index,
type: this.config.type,
body
})
next()
} catch(err) {
next(err)
}
}
}
module.exports = ElasticSearchWritableStream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment