Skip to content

Instantly share code, notes, and snippets.

@sibelius
Created February 11, 2020 14:24
Show Gist options
  • Save sibelius/3ec1878ebb74368f647eb30b57eaf279 to your computer and use it in GitHub Desktop.
Save sibelius/3ec1878ebb74368f647eb30b57eaf279 to your computer and use it in GitHub Desktop.
Mongo cursor processing - let you select a strategy of how to process elements of a Cursor so you can iterate through all items
/*
* Usage
* let cursor = Test.find().sort({ name: 1 }).cursor();
const get = makeGen(cursor, 100);
let first100 = await get.next();
console.log(first100.value.length);
https://gist.github.com/lineus/3f7d826a21796129db968d6590c93faa
*/
export async function* batchCursor(c, n) {
const cursor = c;
while (true) {
const ret = [];
let i = 0;
while (i < n) {
const val = await cursor.next();
if (val) {
ret.push(val);
} else {
return ret;
}
i++;
}
yield ret;
}
}
import { Document, QueryCursor } from 'mongoose';
import { batchCursor } from './batchCursor';
import { promiseAllLimit } from './promiseAllLimit';
export const CURSOR_PROCESSING = {
SINGLE: 'SINGLE',
BATCH: 'BATCH',
PROMISE_LIMIT_ALL: 'PROMISE_LIMIT_ALL',
};
export const BATCH_SIZE = 1000;
type ProcessItem = (doc: Document) => Promise<void>;
const cursorProcessingSingle = async (cursor: QueryCursor, processItem: ProcessItem) => {
for await (const doc of cursor) {
await processItem(doc);
}
};
const cursorProcessingBatch = async (cursor: QueryCursor, processItem: ProcessItem, batchSize = 1000) => {
const batchedCursor = cursor.cursor({ batchSize });
const batched = batchCursor(batchedCursor, batchSize);
while (true) {
const { value, done } = await batched.next();
for (const doc of value) {
await processItem(doc);
}
if (done) {
break;
}
}
};
const cursorProcessingPromiseLimitAll = async (cursor: QueryCursor, processItem: ProcessItem, batchSize = 1000) => {
const items = await cursor;
await promiseAllLimit(items, batchSize, async doc => {
await processItem(doc);
});
};
export const cursorProcessing = async (
cursor: QueryCursor,
processItem: ProcessItem,
type = CURSOR_PROCESSING.BATCH,
batchSize = 1000,
) => {
const cursorProcessingFn = {
[CURSOR_PROCESSING.SINGLE]: cursorProcessingSingle,
[CURSOR_PROCESSING.BATCH]: cursorProcessingBatch,
[CURSOR_PROCESSING.PROMISE_LIMIT_ALL]: cursorProcessingPromiseLimitAll,
};
if (!Object.keys(CURSOR_PROCESSING).includes(type)) {
// eslint-disable-next-line
console.log('wrong cursor processing type');
return;
}
const fn = cursorProcessingFn[type];
await fn(cursor, processItem, batchSize);
};
export async function promiseAllLimit(items, limit, fn) {
for (let start = 0; start < items.length; start += limit) {
const end = start + limit > items.length ? items.length : start + limit;
await Promise.all(items.slice(start, end).map(fn));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment