Skip to content

Instantly share code, notes, and snippets.

@Deepak13245
Last active September 8, 2023 20:54
Show Gist options
  • Save Deepak13245/d920fe4035dc52894dc084c29f7b4b5e to your computer and use it in GitHub Desktop.
Save Deepak13245/d920fe4035dc52894dc084c29f7b4b5e to your computer and use it in GitHub Desktop.
import { Transform, Writable } from 'stream';
import { pipeline } from 'stream/promises';
export async function updateWithStream(
model, filters = {},
transformFunc, saveFunc,
options = {
batchSize: 10
},
) {
const readStream = model.find(filters).cursor({
batchSize: options.batchSize || 10,
});
const transformStream = new Transform({
objectMode : true,
readableHighWaterMark : options.batchSize || 10,
writableHighWaterMark : options.batchSize || 10,
transform : async (doc, encoding, callback) => {
try {
callback(null, await transformFunc(doc, encoding));
}
catch (e) {
return callback(e);
}
},
});
const saveStream = new Writable({
objectMode : true,
highWaterMark : options.batchSize || 10,
write : async (doc, encoding, callback) => {
try {
await saveFunc(doc);
callback();
}
catch (e) {
return callback(e);
}
},
});
await pipeline(readStream, transformStream, saveStream);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment