Skip to content

Instantly share code, notes, and snippets.

@mkurian
Last active December 3, 2019 00:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mkurian/762d7a0691bd1f8a5f3e055d30cab4cb to your computer and use it in GitHub Desktop.
Save mkurian/762d7a0691bd1f8a5f3e055d30cab4cb to your computer and use it in GitHub Desktop.
for (i in 1..9) {
val workerId = "workerId-$i"
val kclConfig = KinesisClientLibConfiguration("fooWorker",
streamConfig.streamArn, awsAuth.credentialsProvider(), workerId)
.withMaxRecords(streamConfig.maxRecords)
.withInitialPositionInStream(InitialPositionInStream.valueOf(streamConfig.streamPosition))
.withRegionName(ddbStreamConfigProperties.region)
.withDynamoDBEndpoint(ddbStreamConfigProperties.dynamoDBEndpoint)
.withCallProcessRecordsEvenForEmptyRecordList(true)
.withParentShardPollIntervalMillis(3000)
val streamWorker = Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(kclConfig).kinesisClient(AmazonDynamoDBStreamsAdapterClient(awsAuth.credentialsProvider()))
.build()
val workerThread = Thread(streamWorker)
workerThread.name = "$workerId-${workerThread.name}"
workerThread.start()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment