Skip to content

Instantly share code, notes, and snippets.

@baumandm
Created June 10, 2016 16:01
Show Gist options
  • Save baumandm/c10fd31701366cf6b1b8ad996312acf9 to your computer and use it in GitHub Desktop.
Save baumandm/c10fd31701366cf6b1b8ad996312acf9 to your computer and use it in GitHub Desktop.
Streaming from MongoDB to Elasticsearch
// Uses Mongoose to stream a large collection out of mongoDB and into Elasticsearch
// Using the Elasticsearch.js library and bulk mode..
// Bulk request size must be large enough to avoid request timeout errors (depends on the ES cluster specs)
var mongoose = require('mongoose');
// Mongo connection, schema all configured in mongo.js
var mongo = require('./mongo');
var elasticsearch = require('elasticsearch');
var client = new elasticsearch.Client({
host: 'localhost:9200',
log: 'info'
});
var indexName = 'myIndex';
var typeName = 'doc';
var Model = mongoose.model('myModel');
var count = 0;
var startTime = new Date().getTime();
var stream = Model.find().lean().stream();
var bulk = [];
var sendAndEmptyQueue = function () {
client.bulk({
body: bulk
}, function (err, resp) {
if (err) {
console.log(err);
} else {
console.log('Sent ' + bulk.length + ' documents to Elasticsearch!');
}
})
bulk = [];
}
stream.on('data', function(doc) {
count += 1;
bulk.push({ index: { _index: indexName, _type: typeName, _id: doc._id }});
bulk.push(doc);
if (count % 500 == 1) {
sendAndEmptyQueue();
}
}).on('err', function(err) {
console.log('MongoDB Stream Error: ' + err);
}).on('close', function() {
sendAndEmptyQueue();
console.log('Document Count is: ' + count);
console.log('Duration is: ' + (new Date().getTime() - startTime));
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment