Skip to content

Instantly share code, notes, and snippets.

@markbirbeck
Created October 29, 2018 11:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markbirbeck/a087d1bf1185fdb2921305fc09eb6869 to your computer and use it in GitHub Desktop.
Save markbirbeck/a087d1bf1185fdb2921305fc09eb6869 to your computer and use it in GitHub Desktop.
class ElasticSearchWritableStream extends stream.Writable {
...
async _writev(chunks, next) {
const body = chunks
.map(chunk => chunk.chunk)
.reduce((arr, obj) => {
/**
* Each entry to the bulk API comprises an instruction (like 'index'
* or 'delete') and some data:
*/
arr.push({ index: { } })
arr.push(obj)
return arr
}, [])
/**
* Push the array of actions to ES and indicate that we are ready
* for more data. Be sure to propagate any errors:
*/
try {
await this.client.bulk({
index: this.config.index,
type: this.config.type,
body
})
next()
} catch(err) {
next(err)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment