Skip to content

Instantly share code, notes, and snippets.

@rahulsivalenka
Forked from andrhamm/callbacks.js
Created March 21, 2019 00:55
Show Gist options
  • Save rahulsivalenka/1214091a809fd36e50837728a35d552d to your computer and use it in GitHub Desktop.
Save rahulsivalenka/1214091a809fd36e50837728a35d552d to your computer and use it in GitHub Desktop.
Paginating Scans & Queries in DynamoDB with Node.js using Callbacks OR Promises
const AWS = require('aws-sdk');
AWS.config.logger = console;
const dynamodb = new AWS.DynamoDB({ apiVersion: '2012-08-10' });
let val = 'some value';
let params = {
TableName: "MyTable",
ExpressionAttributeValues: {
':val': {
S: val,
},
},
Limit: 1000,
FilterExpression: 'MyAttribute = :val'
};
dynamodb.scan(scanParams, function scanUntilDone(err, data) {
if (err) {
console.log(err, err.stack);
} else {
// do something with this page of 0-1000 results
if (data.LastEvaluatedKey) {
params.ExclusiveStartKey = data.LastEvaluatedKey;
dynamodb.scan(params, scanUntilDone);
} else {
// all results processed. done
}
}
});
export const chunkArray = (arr, chunkSize = 10) => {
const tempArray = [];
let i;
let j;
for (i = 0, j = arr.length; i < j; i += chunkSize) {
tempArray.push(arr.slice(i, i + chunkSize));
}
return tempArray;
};
const { chunkArray } = require('./chunk_array');
// Does a `query` when KeyConditionExpression param is present, otherwise a `scan`
// Paginates through all results and runs `callback` multiple times (once per batch)
// When all results are processed, promise resolves with stats about the operation
export const dynamoDbSearch = (additionalParams, batchCallback, batchSize) => {
const scanOrQuery = additionalParams.KeyConditionExpression ? 'query' : 'scan';
const params = {
Limit: 1000,
...additionalParams,
};
const stats = {
pages: 0,
items: 0,
batches: 0,
batch_index: 0,
};
const processResp = (resp) => {
stats.pages += 1;
stats.items += resp.Items.length;
if (resp.Items && resp.Items.length > 0) {
if (batchSize) {
const chunks = chunkArray(resp.Items, batchSize);
stats.batches += chunks.length;
for (let i = 0; i < chunks.length; i += 1) {
stats.batch_index = i;
if (chunks[i] && chunks[i].length > 0) {
batchCallback(chunks[i], stats);
}
}
} else {
// if batchSize isn't specified, don't chunk the response
batchCallback(resp.Items, stats);
}
}
if (resp.LastEvaluatedKey) {
params.ExclusiveStartKey = resp.LastEvaluatedKey;
return dynamodb[scanOrQuery](params)
.promise()
.then(processResp);
}
return Promise.resolve(stats);
};
return dynamodb[scanOrQuery](params)
.promise()
.then(processResp);
};
// Usage example
//
let params = {
TableName: "MyTable",
ExpressionAttributeValues: {
':val': {
S: val,
},
},
Limit: 1000,
FilterExpression: 'MyAttribute = :val'
};
dynamoDbSearch(
params,
(batch) => {
// do something with this page of 1-25 results
},
25,
)
.then(() => {
// all results processed. done
})
.catch((err) => {
console.log(err, err.stack);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment