Skip to content

Instantly share code, notes, and snippets.

@seansullivan
Last active June 30, 2021 18:58
Show Gist options
  • Save seansullivan/295ff4ed11b453c1a6f7ea573905cd9b to your computer and use it in GitHub Desktop.
Save seansullivan/295ff4ed11b453c1a6f7ea573905cd9b to your computer and use it in GitHub Desktop.
Log Records from AWS Kinesis Stream with Node.js
const AWS = require('aws-sdk');
AWS.config.setPromisesDependency(require('Q').Promise);
const _ = require('lodash');
const moment = require('moment');
const kinesis = new AWS.Kinesis({region: 'us-east-1'});
const streamName = 'my-stream';
const recordsToRetrieve = 10;
let recordsRetrieved = 0;
const retrieveRecords = (recordsResult) => {
const { Records: records, NextShardIterator: nextShardIterator } = recordsResult;
recordsRetrieved += _.size(records);
_.forEach(records, (record) => {
const recordData = record.Data.toString('utf-8');
const recordDataParsed = JSON.parse(recordData);
console.log(`Record with sequence number: ${record.SequenceNumber} and partition key: ${record.PartitionKey}`);
console.log(recordDataParsed);
console.log('');
});
if (_.isNull(nextShardIterator) || recordsRetrieved >= recordsToRetrieve) {
return records;
}
return kinesis.getRecords({ ShardIterator: nextShardIterator, Limit: recordsToRetrieve - recordsRetrieved }).promise().then(retrieveRecords);
}
kinesis.describeStream({StreamName: streamName, Limit: 1}).promise()
.then((streamDetails) => {
const shards = _.get(streamDetails, 'StreamDescription.Shards', []);
const firstShardId = _.head(shards).ShardId;
var params = {
ShardId: firstShardId,
ShardIteratorType: 'AT_TIMESTAMP',
StreamName: streamName,
Timestamp: moment().subtract(1, 'month').toDate()
};
return kinesis.getShardIterator(params).promise();
})
.then((iteratorResponse) => {
const { ShardIterator: shardIterator } = iteratorResponse;
return kinesis.getRecords({ ShardIterator: shardIterator, Limit: recordsToRetrieve}).promise().then(retrieveRecords);
})
.catch((error) => {
console.error(error);
console.error(error.stack);
});
@sonnyrajagopalanuptycs
Copy link

Q no longer supported. Do you have a workaround?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment