Skip to content

Instantly share code, notes, and snippets.

@djanowski
Created November 29, 2017 15:28
Show Gist options
  • Save djanowski/c7b2fc5592438c47281cdc3c3fbf3473 to your computer and use it in GitHub Desktop.
Save djanowski/c7b2fc5592438c47281cdc3c3fbf3473 to your computer and use it in GitHub Desktop.
Concurrently transform a MongoDB cursor into a stream of bulk MongoDB updates.
// Concurrently transform a MongoDB cursor into a stream of bulk MongoDB updates.
//
// Usage:
//
// const users = db.collection('users');
// const usersToUpdate = users.find({ email: null });
//
// await eachAsyncUpdate(users, usersToUpdate, async function(user) {
// const userEmail = await findUserEmail(user);
//
// return {
// find: { _id: user._id },
// updateOne: { $set: { email: userEmail } }
// };
// });
//
const highland = require('highland');
async function eachAsyncUpdate(collection, query, updateFn) {
await highland(query)
.map(updateFn)
.map(highland)
.mergeWithLimit(100)
.compact()
.batch(1000)
.map(bulkUpdateBatch)
.map(highland)
.series()
.compact()
.toPromise(Promise);
async function bulkUpdateBatch(operations) {
const bulk = collection.initializeUnorderedBulkOp();
for (const operation of operations) {
bulk.find(operation.find)
.updateOne(operation.updateOne);
}
await bulk.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment