Skip to content

Instantly share code, notes, and snippets.

@jiexi
Last active July 28, 2020 21:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jiexi/a30dadcff08cbba06b16bda70fba6f13 to your computer and use it in GitHub Desktop.
Save jiexi/a30dadcff08cbba06b16bda70fba6f13 to your computer and use it in GitHub Desktop.
Nodejs Kinesis Consumer/Producer + LocalStack
The java runtime flags that you need to set in order to be able to
communicate with localstack aren't configurable from the command line
when using v2 of the java sdk. Anything newer than the commit
15cec60231167a89e4debcd0d58bfaaf5d666aa0 on the aws-kcl npm module
uses v2 of the java sdk. If you wish to develop a Kinesis consumer
in nodejs, you will need to use v0.8.0 of aws-kcl and the patched
kcl-bootstrap attached. Note that this limitation does not exist
for the producer, only the consumer.
/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/asl/
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
***/
'use strict';
var config = module.exports = {
kinesis : {
region : 'us-east-1',
endpoint: "https://localhost:4568"
},
sampleProducer : {
stream : 'kclnodejssample',
shards : 2,
waitBetweenDescribeCallsInSeconds : 5
}
};
FROM localstack/localstack:latest
RUN apk add --no-cache jq && \
rm -rf /var/cache/apk/*
ENV DATA_DIR=/opt/localstack-data \
USE_SSL=true\
SERVICES=kinesis,dynamodb,cloudwatch
COPY setup_localstack.sh /docker-entrypoint-initaws.d/
#!/usr/bin/env node
/***
NOTE(JL): This is a modified version of the kcl-boostrap found in
aws-kcl commit 15cec60231167a89e4debcd0d58bfaaf5d666aa0 (HEAD, tag: v0.8.0).
Specifically, it fixes a bug with http/https fetching for maven packages and
adds two flags '-Dcom.amazonaws.sdk.disableCbor=true' and '-Dcom.amazonaws.sdk.disableCertChecking'
to the java run time which allows communication with localstack.
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/asl/
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
***/
'use strict';
var fs = require('fs');
var http = require('http');
var https = require('https');
var path = require('path');
var program = require('commander');
var spawn = require('child_process').spawn;
var url = require('url');
var util = require('util');
var MAVEN_PACKAGE_LIST = [
getMavenPackageInfo('com.amazonaws', 'amazon-kinesis-client', '1.9.3'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-dynamodb', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-s3', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-kms', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-core', '1.11.438'),
getMavenPackageInfo('org.apache.httpcomponents', 'httpclient', '4.5.5'),
getMavenPackageInfo('org.apache.httpcomponents', 'httpcore', '4.4.9'),
getMavenPackageInfo('commons-codec', 'commons-codec', '1.10'),
getMavenPackageInfo('software.amazon.ion', 'ion-java', '1.0.2'),
getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-databind', '2.6.7.1'),
getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-annotations', '2.6.0'),
getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-core', '2.6.7'),
getMavenPackageInfo('com.fasterxml.jackson.dataformat', 'jackson-dataformat-cbor', '2.6.7'),
getMavenPackageInfo('joda-time', 'joda-time', '2.8.1'),
getMavenPackageInfo('com.amazonaws', 'jmespath-java', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-kinesis', '1.11.438'),
getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-cloudwatch', '1.11.438'),
getMavenPackageInfo('com.google.guava', 'guava', '26.0-jre'),
getMavenPackageInfo('com.google.code.findbugs', 'jsr305', '3.0.2'),
getMavenPackageInfo('org.checkerframework', 'checker-qual', '2.5.2'),
getMavenPackageInfo('com.google.errorprone', 'error_prone_annotations', '2.1.3'),
getMavenPackageInfo('com.google.j2objc', 'j2objc-annotations', '1.1'),
getMavenPackageInfo('org.codehaus.mojo', 'animal-sniffer-annotations', '1.14'),
getMavenPackageInfo('com.google.protobuf', 'protobuf-java', '2.6.1'),
getMavenPackageInfo('org.apache.commons', 'commons-lang3', '3.7'),
getMavenPackageInfo('commons-logging', 'commons-logging', '1.1.3')
];
var DEFAULT_JAR_PATH = path.resolve(path.join(__dirname, '..', 'lib', 'jars'));
var MULTI_LANG_DAEMON_CLASS = 'com.amazonaws.services.kinesis.multilang.MultiLangDaemon';
var MAX_HTTP_REDIRECT_FOLLOW = 3;
function bootstrap() {
var args = parseArguments();
downloadMavenPackages(MAVEN_PACKAGE_LIST, args.jarPath, function(err) {
if (err) {
errorExit(util.format('Unable to download Multi-Language Daemon jar files from maven: %s', err));
}
startKinesisClientLibraryApplication(args);
});
}
function parseArguments() {
program
.option('-p, --properties <properties file>', 'properties file with multi-language daemon options')
.option('-j, --java [java path]', 'path to java executable - defaults to using JAVA_HOME environment variable to get java path (optional)')
.option('-c, --jar-path [jar path]', 'path where all multi-language daemon jar files will be downloaded (optional)')
.option('-e, --execute', 'execute the KCL application')
.parse(process.argv);
var args = {
'properties': program.properties,
'java': (program.java ? program.java : (process.env.JAVA_HOME ? path.join(process.env.JAVA_HOME, 'bin', 'java') : null)),
'jarPath': (program.jarPath ? program.jarPath : DEFAULT_JAR_PATH),
'execute': program.execute
};
if (!args.properties) {
invalidInvocationExit(program, 'Specify a valid --properties value.', true);
}
if (!isFile(args.properties)) {
invalidInvocationExit(program, args.properties + ' file does not exist. Specify a valid --properties value.', true);
}
if (!isFile(args.java)) {
invalidInvocationExit(program, 'Valid --java value is required or alternatively JAVA_HOME environment variable must be set.', true);
}
if (args.jarPath === DEFAULT_JAR_PATH) {
createDirectory(args.jarPath);
}
else if (!isDirectory(args.jarPath)) {
invalidInvocationExit(program, 'Path specified with --jar-path must already exist and must be a directory.', false);
}
return args;
}
function startKinesisClientLibraryApplication(options) {
var classpath = getClasspath(options).join(getPathDelimiter());
var java = options.java;
var args = ['-Dcom.amazonaws.sdk.disableCbor=true', '-Dcom.amazonaws.sdk.disableCertChecking', '-cp', classpath, MULTI_LANG_DAEMON_CLASS, options.properties];
var cmd = java + ' ' + args.join(' ');
console.log("==========================================================");
console.log(cmd);
console.log("==========================================================");
if (options.execute) {
console.log("Starting Multi-Lang Daemon ...");
spawn(java, args, { stdio: 'inherit' });
}
}
function getClasspath(options) {
var classpath = [];
fs.readdirSync(options.jarPath).map(function (file) {
return path.join(options.jarPath, file);
}).filter(function (file) {
return isFile(file);
}).forEach(function (file) {
classpath.push(path.resolve(file));
});
classpath.push(path.resolve('.'));
classpath.push(path.dirname(path.resolve(options.properties)));
return classpath;
}
function downloadMavenPackages(mavenPackages, destinationDirectory, callback) {
var remainingPackages = mavenPackages.length;
var callbackInvoked = false;
var downloadMavenPackageCallback = function(err, filePath) {
remainingPackages = remainingPackages - 1;
if (!callbackInvoked) {
if (!err) {
console.log(filePath + ' downloaded. ' + remainingPackages + ' files remain.');
}
if (err || remainingPackages === 0) {
callbackInvoked = true;
callback(err);
return;
}
}
};
for (var i = 0 ; i < mavenPackages.length ; ++i) {
downloadMavenPackage(mavenPackages[i], destinationDirectory, downloadMavenPackageCallback);
}
}
function downloadMavenPackage(mavenPackage, destinationDirectory, callback) {
process.nextTick(function() {
var mavenPackageUrlInfo = getMavenPackageUrlInfo(mavenPackage);
var destinationFile = path.join(destinationDirectory, mavenPackageUrlInfo.fileName);
if (fs.existsSync(destinationFile)) {
callback(null, destinationFile);
return;
}
httpDownloadFile(mavenPackageUrlInfo.url, destinationFile, 0, callback);
});
}
function httpDownloadFile(requestUrl, destinationFile, redirectCount, callback) {
if (redirectCount >= MAX_HTTP_REDIRECT_FOLLOW) {
callback('Reached maximum redirects. ' + requestUrl + ' could not be downloaded.');
return;
}
var protocol = (url.parse(requestUrl).protocol === 'https:' ? https : http);
var options = {
hostname: url.parse(requestUrl).hostname,
path: url.parse(requestUrl).path,
agent: false
};
var request = protocol.get(options, function(response) {
// Non-2XX response.
if (response.statusCode > 300) {
if (response.statusCode > 300 && response.statusCode < 400 && response.headers.location) {
httpDownloadFile(response.headers.location, destinationFile, redirectCount + 1, callback);
return;
}
else {
callback(requestUrl + ' could not be downloaded: ' + response.statusCode);
return;
}
}
else {
var destinationFileStream = fs.createWriteStream(destinationFile);
response.pipe(destinationFileStream);
var callbackInvoked = false;
var destinationFileStreamFinishCallback = function() {
if (callbackInvoked) {
return;
}
callbackInvoked = true;
callback(null, destinationFile);
};
destinationFileStream.on('finish', destinationFileStreamFinishCallback);
// Older Node.js version may not support 'finish' event.
destinationFileStream.on('close', destinationFileStreamFinishCallback);
}
}).on('error', function(err) {
fs.unlink(destinationFile);
callback(err);
});
}
function getMavenPackageUrlInfo(mavenPackage) {
var urlParts = [];
var fileName = util.format('%s-%s.jar', mavenPackage.artifactId, mavenPackage.version);
mavenPackage.groupId.split('.').forEach(function (part) {
urlParts.push(part);
});
urlParts.push(mavenPackage.artifactId);
urlParts.push(mavenPackage.version);
urlParts.push(fileName);
return {
'url': "https://repo1.maven.org/maven2/" + urlParts.join('/'),
'fileName': fileName
};
}
function getMavenPackageInfo(groupId, artifactId, version) {
return {
'groupId': groupId,
'artifactId': artifactId,
'version': version
};
}
function isDirectory(path) {
try {
return fs.statSync(path).isDirectory();
} catch (e) {
// Path does not exist.
return false;
}
}
function createDirectory(path) {
try {
fs.mkdirSync(path);
} catch(e) {
if (e.code !== 'EEXIST') {
throw e;
}
}
}
function isFile(path) {
try {
return fs.statSync(path).isFile();
} catch (e) {
// Path does not exist.
return false;
}
}
function getPathDelimiter() {
if (path.delimiter) {
return path.delimiter;
}
// Older Node.js version may not support path.delimiter.
return (/^win/.test(process.platform) ? ';' : ':');
}
function invalidInvocationExit(prog, err, showHelp) {
console.error('');
console.error(util.format('ERROR: %s', err));
console.error('');
if (showHelp) {
prog.outputHelp();
}
process.exit(1);
}
function errorExit(err) {
console.error('');
console.error(util.format('ERROR: %s', err));
console.error('');
process.exit(1);
}
bootstrap();
{
"name": "aws-kcl",
"description": "Kinesis Client Libray (KCL) in Node.js.",
"version": "0.8.0",
"author": {
"name": "Amazon Web Services",
"url": "http://aws.amazon.com/"
},
"main": "index.js",
"engines": {
"node": ">= 0.8.0"
},
"bin": {
"kcl-bootstrap": "bin/kcl-bootstrap",
"kcl-bootstrap.bat": "bin/kcl-bootstrap.bat"
},
"scripts": {
"build": "grunt build",
"compile": "grunt compile",
"clean": "grunt clean",
"test": "grunt test",
"release": "grunt release",
"doc": "grunt jsdoc"
},
"dependencies": {
"commander": "~2.6.0",
"machina": "~1.0.0-1"
},
"devDependencies": {
"async": "~0.9.0",
"aws-sdk": "2.x",
"blanket": "~1.1.5",
"chai": "^1.10.0",
"grunt": "~0.4.5",
"grunt-cli": "~0.1.13",
"grunt-contrib-clean": "~0.6.0",
"grunt-contrib-jshint": "~0.6.4",
"grunt-jsdoc": "~0.5.8",
"grunt-mocha-test": "~0.12.7",
"log4js": "~0.6.22",
"mocha": "^2.1.0",
"sinon": "^1.12.2"
},
"homepage": "https://github.com/awslabs/amazon-kinesis-client-nodejs",
"repository": {
"type": "git",
"url": "git://github.com/awslabs/amazon-kinesis-client-nodejs.git"
},
"bugs": {
"url": "https://github.com/awslabs/amazon-kinesis-client-nodejs/issues"
},
"license": "Amazon Software License",
"keywords": [
"api",
"amazon",
"aws",
"big data",
"kinesis",
"kinesis client library",
"kcl",
"node.js"
]
}
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process.
streamName = kclnodejssample
# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = kclnodejssample
# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = nodejs/0.10
# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON
# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.
# Region of the stream for the KCL.
regionName = us-east-1
# Custom Kinesis Endpoint
kinesisEndpoint = localhost:4568
# Custom DynamoDB Endpoint
dynamoDBEndpoint = localhost:4569
# Disables calls to cloudwatch (we can't define a custom endpoint)
MetricsLevel = none
# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000
# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId =
# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000
# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000
# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000
# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false
# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000
# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true
# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500
# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000
# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000
# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true
# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/asl/
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
***/
'use strict';
var fs = require('fs');
var path = require('path');
var util = require('util');
var kcl = require('../../..');
var logger = require('../../util/logger');
/**
* A simple implementation for the record processor (consumer) that simply writes the data to a log file.
*
* Be careful not to use the 'stderr'/'stdout'/'console' as log destination since it is used to communicate with the
* {https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java MultiLangDaemon}.
*/
function recordProcessor() {
var log = logger().getLogger('recordProcessor');
var shardId;
return {
initialize: function(initializeInput, completeCallback) {
shardId = initializeInput.shardId;
completeCallback();
},
processRecords: function(processRecordsInput, completeCallback) {
if (!processRecordsInput || !processRecordsInput.records) {
completeCallback();
return;
}
var records = processRecordsInput.records;
var record, data, sequenceNumber, partitionKey;
for (var i = 0 ; i < records.length ; ++i) {
record = records[i];
data = new Buffer(record.data, 'base64').toString();
sequenceNumber = record.sequenceNumber;
partitionKey = record.partitionKey;
log.info(util.format('ShardID: %s, Record: %s, SeqenceNumber: %s, PartitionKey:%s', shardId, data, sequenceNumber, partitionKey));
}
if (!sequenceNumber) {
completeCallback();
return;
}
// If checkpointing, completeCallback should only be called once checkpoint is complete.
processRecordsInput.checkpointer.checkpoint(sequenceNumber, function(err, sequenceNumber) {
log.info(util.format('Checkpoint successful. ShardID: %s, SeqenceNumber: %s', shardId, sequenceNumber));
completeCallback();
});
},
shutdownRequested: function(shutdownRequestedInput, completeCallback) {
shutdownRequestedInput.checkpointer.checkpoint(function (err) {
completeCallback();
});
},
shutdown: function(shutdownInput, completeCallback) {
// Checkpoint should only be performed when shutdown reason is TERMINATE.
if (shutdownInput.reason !== 'TERMINATE') {
completeCallback();
return;
}
// Whenever checkpointing, completeCallback should only be invoked once checkpoint is complete.
shutdownInput.checkpointer.checkpoint(function(err) {
completeCallback();
});
}
};
}
kcl(recordProcessor()).run();
/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/asl/
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
***/
'use strict';
var AWS = require('aws-sdk');
var config = require('./config');
var producer = require('./sample_producer');
var https = require('https');
AWS.config.update({ httpOptions: { agent: new https.Agent({ rejectUnauthorized: false }) } });
var kinesis = new AWS.Kinesis(config.kinesis);
producer(kinesis, config.sampleProducer).run();
/***
Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/asl/
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
***/
'use strict';
var util = require('util');
var logger = require('../../util/logger');
function sampleProducer(kinesis, config) {
var log = logger().getLogger('sampleProducer');
function _createStreamIfNotCreated(callback) {
var params = {
ShardCount : config.shards,
StreamName : config.stream
};
kinesis.createStream(params, function(err, data) {
if (err) {
if (err.code !== 'ResourceInUseException') {
callback(err);
return;
}
else {
log.info(util.format('%s stream is already created. Re-using it.', config.stream));
}
}
else {
log.info(util.format("%s stream doesn't exist. Created a new stream with that name ..", config.stream));
}
// Poll to make sure stream is in ACTIVE state before start pushing data.
_waitForStreamToBecomeActive(callback);
});
}
function _waitForStreamToBecomeActive(callback) {
kinesis.describeStream({StreamName : config.stream}, function(err, data) {
if (!err) {
log.info(util.format('Current status of the stream is %s.', data.StreamDescription.StreamStatus));
if (data.StreamDescription.StreamStatus === 'ACTIVE') {
callback(null);
}
else {
setTimeout(function() {
_waitForStreamToBecomeActive(callback);
}, 1000 * config.waitBetweenDescribeCallsInSeconds);
}
}
});
}
function _writeToKinesis() {
var currTime = new Date().getMilliseconds();
var sensor = 'sensor-' + Math.floor(Math.random() * 100000);
var reading = Math.floor(Math.random() * 1000000);
var record = JSON.stringify({
time : currTime,
sensor : sensor,
reading : reading
});
var recordParams = {
Data : record,
PartitionKey : sensor,
StreamName : config.stream
};
kinesis.putRecord(recordParams, function(err, data) {
if (err) {
log.error(err);
}
else {
log.info('Successfully sent data to Kinesis.');
}
});
}
return {
run: function() {
_createStreamIfNotCreated(function(err) {
if (err) {
log.error(util.format('Error creating stream: %s', err));
return;
}
var count = 0;
while (count < 10) {
setTimeout(_writeToKinesis, 1000);
count++;
}
});
}
};
}
module.exports = sampleProducer;
#!/usr/bin/env bash
set -euo pipefail
# This script is ran on startup of localstack to provision the environment with what we need
awslocal kinesis create-stream --shard-count 2 --stream-name kclnodejssample
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment