Skip to content

Instantly share code, notes, and snippets.

@thrijith
Created November 30, 2023 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thrijith/acab6e1c521fad94e676dd6e6bd6ab2b to your computer and use it in GitHub Desktop.
Save thrijith/acab6e1c521fad94e676dd6e6bd6ab2b to your computer and use it in GitHub Desktop.
const ElasticsearchWritableStream = require('elasticsearch-writable-stream');
const elasticSearchStream = new ElasticsearchWritableStream(client, {
highWaterMark: BULK_INDEX_COUNT,
flushTimeout: 500
});
const yauzl = require("yauzl");
const fs = require("fs");
const csv = require("csv-parser");
const through2 = require("through2");
let promise = new Promise((resolve, reject) => {
yauzl.open('/tmp/' + filename, {lazyEntries: true}, function (err, zipfile) {
zipfile.readEntry();
zipfile.on('entry', entry => {
zipfile.openReadStream(entry, function (err, readStream) {
if (err) throw err;
readStream.on('end', function () {
zipfile.readEntry();
});
readStream.pipe(csv({headers: csvHeaders}))
.pipe(through2.obj(function (chunk, enc, callback) {
let row = parseCsvData(chunk, filename);
let operation = {
index: ES_INDEX_NAME,
type: '_doc',
id: row.my_id,
action: 'index',
body: row,
};
this.push(operation);
callback();
}))
.pipe(through2.obj(function (chunk, enc, callback) {
count += 1;
totalCount += 1;
if (0 === count % (NOTIFY_COUNT)) {
console.log('Processed ' + count + ' records from ' + filename);
}
this.push(chunk);
callback();
}))
.pipe(elasticSearchStream)
.on('error', function (error) {
console.error('Error processing row in ' + filename, error);
})
.on('finish', function () {
console.log('Finished processing ' + count + ' records from ' + filename);
});
});
})
.on('close', async () => {
fs.unlinkSync(`/tmp/${filename}`);
console.log('Finished processing ' + filename);
resolve();
});
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment