Skip to content

Instantly share code, notes, and snippets.

@alexbeletsky
Created May 1, 2014 09:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexbeletsky/0565a49bc2cfeadb6d3f to your computer and use it in GitHub Desktop.
Save alexbeletsky/0565a49bc2cfeadb6d3f to your computer and use it in GitHub Desktop.
Using streams for patching db
var _ = require('underscore');
var through = require('through');
var moment = require('moment');
var logger = require('../source/utils/logger');
var log = require('single-line-log');
function run(db, callback) {
var started = moment();
var patched = 0;
db.items.count({user: {$type: 3}}, function (err, total) {
if (err) {
return callback(err);
}
var fix = through(function (item) {
var copy = _.clone(item);
var email = item.user.email;
var userData = item.user;
var fixed = _.extend(copy, {user: email, userData: userData});
this.queue(fixed);
});
var save = through(function (item) {
var me = this;
me.pause();
db.items.findAndModify({
query: {_id: item._id},
update: { $set: _.omit(item, '_id') },
'new': true,
}, function (err, item) {
err && me.emit('error', err);
me.resume();
me.queue(item);
});
});
var progress = through(function () {
var percentage = Math.floor(100 * ++patched / total);
log('Patching collections\n[' + percentage + '%]', patched, 'documents patched');
});
var stream = db.items
.find({user: {$type: 3}})
.pipe(fix)
.pipe(save)
.pipe(progress);
stream.on('error', function (err) {
callback(err);
});
stream.on('end', function () {
var finished = moment();
var duration = moment.duration(finished.diff(started)).asSeconds();
logger.info({message: 'patch: ' + __filename + ' executed'});
callback(null, {documents: patched, time: duration});
});
});
}
module.exports = {
run: run
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment