Skip to content

Instantly share code, notes, and snippets.

@romanlab
Last active June 27, 2018 08:51
Show Gist options
  • Save romanlab/87b3d0ccf57421562099c09e98846b6c to your computer and use it in GitHub Desktop.
Save romanlab/87b3d0ccf57421562099c09e98846b6c to your computer and use it in GitHub Desktop.
Simple KCL example in NodeJS
'use strict';
/*
Taken from: https://github.com/awslabs/amazon-kinesis-client-nodejs
*/
var kcl = require('aws-kcl');
var util = require('util');
var recordProcessor = {
initialize: function(initializeInput, completeCallback) {
// Initialization logic ...
completeCallback();
},
processRecords: function(processRecordsInput, completeCallback) {
if (!processRecordsInput || !processRecordsInput.records) {
// Must call completeCallback to proceed further.
completeCallback();
return;
}
var records = processRecordsInput.records;
var record, sequenceNumber, partitionKey, data;
for (var i = 0 ; i < records.length ; ++i) {
record = records[i];
sequenceNumber = record.sequenceNumber;
partitionKey = record.partitionKey;
// Note that "data" is a base64-encoded string. Buffer can be used to
// decode the data into a string.
data = new Buffer(record.data, 'base64').toString();
// Custom record processing logic ...
}
if (!sequenceNumber) {
// Must call completeCallback to proceed further.
completeCallback();
return;
}
// If checkpointing, only call completeCallback once checkpoint operation
// is complete.
processRecordsInput.checkpointer.checkpoint(sequenceNumber,
function(err, checkpointedSequenceNumber) {
// In this example, regardless of error, we mark processRecords
// complete to proceed further with more records.
completeCallback();
}
);
},
shutdown: function(shutdownInput, completeCallback) {
// Shutdown logic ...
if (shutdownInput.reason !== 'TERMINATE') {
completeCallback();
return;
}
// Since you are checkpointing, only call completeCallback once the checkpoint
// operation is complete.
shutdownInput.checkpointer.checkpoint(function(err) {
// In this example, regardless of error, we mark the shutdown operation
// complete.
completeCallback();
});
}
};
kcl(recordProcessor).run();
@HafizHassan58
Copy link

from where i can get
inputs for initialize and processRecords methods, i.e. initializeInput, processRecordsInput

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment