Skip to content

Instantly share code, notes, and snippets.

@benjamine
Last active January 29, 2018 23:50
Show Gist options
  • Save benjamine/800562afc3a1e53375f8c4808cee4158 to your computer and use it in GitHub Desktop.
Save benjamine/800562afc3a1e53375f8c4808cee4158 to your computer and use it in GitHub Desktop.
node script to fix mongo docs with controlled concurrency
#!/usr/bin/env node
/*
Requirements:
- node v8.9.3+
- npm install mongodb
- npm install p-queue
*/
const PQueue = require('p-queue');
const mongodb = require('mongodb');
const MongoClient = mongodb.MongoClient;
const ObjectId = mongodb.ObjectId;
runBatchFix({
/* batch fix parameters */
url: 'mongodb://INSERTYOURMONGOHOSTNAME:27017/',
dbName: 'INSERTYOURMONGODBNAME',
collection: 'INSERTYOURMONGOCOLLECTIONNAME',
documentProjection: { _id: 1 }, // only get id of each doc
filter: {
// filter documents to process
// _id: new ObjectId('11111c1c1b11b1c11a11cc1'),
},
concurrency: 10,
fixDocument: async(doc, db) => {
// example fix function
// overwrite customer doc .users array with its users from users collection
const users = await db.collection('user').find({
customerId: doc._id.toString(),
}).toArray();
const customerUsers = users.map(u => ({
$ref: 'user', $id: new ObjectId(u._id),
}));
await db.collection('customer').updateOne({
_id: new ObjectId(doc._id),
}, {
$set: {
users: customerUsers,
},
});
},
});
function runBatchFix({
url, dbName, collection,
documentProjection,
filter,
concurrency,
fixDocument,
}) {
MongoClient.connect(url, async function(err, client) {
if (err) {
console.error(err);
process.exit(1);
}
try {
console.log('connected to', url);
await fixThemAll(client);
} catch (err) {
console.error(err);
process.exit(1);
}
});
async function fixThemAll(client) {
const db = client.db(dbName);
console.log('counting documents at', dbName, ', collection', collection, filter);
const count = await db.collection(collection).count(filter);
if (count < 1) {
console.log('zero documents found');
process.exit();
}
console.log('fixing', count, 'document(s)');
// eslint-disable-next-line fp/no-let
let fixes = 0;
const queue = new PQueue({concurrency});
const documents = await db.collection(collection).find(
filter,
documentProjection
).toArray();
const start = new Date().getTime();
documents.forEach(function(doc) {
queue.add(() => fixOne(doc));
});
async function fixOne(doc) {
await fixDocument(doc, db);
// eslint-disable-next-line fp/no-mutation
fixes++;
const eta = ((new Date().getTime() - start) / fixes) *
(count - fixes) / 1000;
console.log('doc', doc._id, 'fixed,', fixes, 'of', count,
'ETA', eta, 'seconds');
if (fixes === count) {
console.log('done! took ', (new Date().getTime() - start) / 1000, 'seconds');
process.exit();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment