Skip to content

Instantly share code, notes, and snippets.

@romanlab
Last active April 4, 2017 05:54
Show Gist options
  • Save romanlab/7c15c82a8c25c11c8a9067a401828554 to your computer and use it in GitHub Desktop.
Save romanlab/7c15c82a8c25c11c8a9067a401828554 to your computer and use it in GitHub Desktop.
const kcl = require('aws-kcl');
const bluebird = require('bluebird');
const util = require('util');
let fs = require('fs');
fs = bluebird.promisifyAll(fs);
class RecordProcessor {
async initialize(initializeInput, completeCallback) {
await fs.writeFileAsync('test.txt', "Initializing\n");
completeCallback();
return;
}
async processRecords(processRecordsInput, completeCallback) {
await fs.appendFileAsync('test.txt', "Processing\n");
if (!processRecordsInput || !processRecordsInput.records) {
// Must call completeCallback to proceed further.
completeCallback();
return;
}
const records = processRecordsInput.records;
for (const { sequenceNumber, partitionKey, data } of records) {
let dataString = new Buffer(data, 'base64').toString();
if (!sequenceNumber) {
// Must call completeCallback to proceed further.
completeCallback();
return;
}
// If checkpointing, only call completeCallback once checkpoint operation
// is complete.
processRecordsInput.checkpointer.checkpoint(sequenceNumber,
function(err, checkpointedSequenceNumber) {
// In this example, regardless of error, we mark processRecords
// complete to proceed further with more records.
completeCallback();
}
);
}
}
async shutdown(shutdownInput, completeCallback) {
await fs.appendFileAsync('test.txt', "Shutting Down\n");
if (shutdownInput.reason !== 'TERMINATE') {
completeCallback();
return;
}
// Since you are checkpointing, only call completeCallback once the checkpoint
// operation is complete.
shutdownInput.checkpointer.checkpoint(function(err) {
// In this example, regardless of error, we mark the shutdown operation
// complete.
completeCallback();
});
}
}
const recordProcessor = new RecordProcessor();
kcl(recordProcessor).run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment