Skip to content

Instantly share code, notes, and snippets.

@emaxerrno
Forked from andresaristizabal/elastic.js
Last active January 27, 2021 03:26
Show Gist options
  • Save emaxerrno/49ad43036e80379c8bf22c3ee8b0b286 to your computer and use it in GitHub Desktop.
Save emaxerrno/49ad43036e80379c8bf22c3ee8b0b286 to your computer and use it in GitHub Desktop.
wasm defintion for indexing record values to Elastic Search server
const {
SimpleTransform,
PolicyError,
} = require("@vectorizedio/wasm-api");
const { Client } = require('@elastic/elasticsearch')
const client = new Client({node: 'http://localhost:9200'})
const transform = new SimpleTransform();
/* Topics that fire the transform function */
transform.subscribe(["produce"]);
/* The strategy the transform engine will use when handling errors */
transform.errorHandler(PolicyError.SkipOnFailure);
/* Auxiliar transform function for records */
/* Transform function */
transform.processRecord((recordBatch) => {
const result = new Map();
return Promise.all(
recordBatch.records.map((record) => {
console.log("applying...")
if (
Buffer.from("save").equals(record.headers[0].headerKey) ||
Buffer.from("elastic").equals(record.headers[0].value)
) {
return client.index({
index: "result_wasm",
type: "sometype",
body: {
record_message: record.value.toString()
}
})
}
})
).then(() => {
result.set("elastic", recordBatch)
return result
}).catch((e) => console.log(e))
});
exports["default"] = transform;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment