Local Kinesis Setup w/ LocalStack
#!/usr/bin/env bash
export $(cat .env | xargs)
export USE_SSL=true
awslocal kinesis create-stream --shard-count ${KINESIS_STREAM_SHARDS} \
--stream-name ${KINESIS_STREAM_NAME}
#!/usr/bin/env bash
export $(cat .env | xargs)
docker pull localstack/localstack:${LOCALSTACK_DOCKER_IMAGE_TAG}
docker run \
--net host \
--name localstack \
-d \
-e "SERVICES=${LOCALSTACK_SERVICES:-kinesis,dynamodb,cloudwatch}" \
-e "DEFAULT_REGION=${AWS_REGION:-us-east-1}" \
-e "USE_SSL=true" \
-e "DATA_DIR=${LOCALSTACK_DATA_DIR:-/tmp/localstack/data}" \
version: "3.3"
image: "localstack/localstack:${LOCALSTACK_DOCKER_IMAGE_TAG:-0.7.5}"
network_mode: "host"
container_name: localstack
- "SERVICES=${LOCALSTACK_SERVICES:-kinesis,dynamodb,cloudwatch}"
- "USE_SSL=true"
- "DATA_DIR=${LOCALSTACK_DATA_DIR:-/tmp/localstack/data}"
kinesis {
akka {
# The dispatcher that will be used by default in the reference configuration
# Note that this can be overridden in the producer
# and consumer configurations via the `use-dispatcher` key.
default-dispatcher {
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 8
parallelism-factor = 3.0
parallelism-max = 64
throughput = 50
# The application name is used as the name of the dynamoDB table used for checkpointing
# You SHOULD override this value
application-name = ${?APPLICATION_NAME}
# Producer definitions for direct controller events
my-producer {
stream-name: ${?KINESIS_STREAM_NAME}
my-consumer {
stream-name: ${?KINESIS_STREAM_NAME}
# The default producer configuration, used for all producers.
default-producer {
# The name of the producer stream, MUST be specified per producer
# stream-name = "kinesis-stream"
akka {
# Fully qualified config path for the producer's dispatcher
# Can be overridden per producer/consumer
# Overriding this value with null or an empty string will result is no specific dispatcher being specified
dispatcher = "kinesis.akka.default-dispatcher"
# The maximum number of concurrent message send request before throttling
# As messages are sent in batches, a Future is create per message to track the successful (or failed)
# put of the message onto Kinesis.
# This value is optional but may help control memory usage
# Defaults to no throttling.
#max-outstanding-requests = 50000
# Optionally defined in combination with the above
# Controls the time after throttling before a retry in millis.
# Default: 100
#throttling-retry-millis = 100
# We convert these to a properties file to be fed directly into the Kinesis KPL library
# See the link below for full details
# This section can be overridden on a per producer basis
# Config values within this kpl block are optional (see defaults)
kpl {
# Enable aggregation. With aggregation, multiple user records are packed into
# a single KinesisRecord. If disabled, each user record is sent in its own
# KinesisRecord.
# If your records are small, enabling aggregation will allow you to put many
# more records than you would otherwise be able to for a shard before getting
# throttled.
# Default: true
AggregationEnabled = true
# Maximum number of items to pack into an aggregated record.
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
# Default: 4294967295
# Minimum: 1
# Maximum (inclusive): 9223372036854775807
AggregationMaxCount = 4294967295
# Maximum number of bytes to pack into an aggregated Kinesis record.
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
# If a record has more data by itself than this limit, it will bypass the
# aggregator. Note the backend enforces a limit of 50KB on record size. If
# you set this beyond 50KB, oversize records will be rejected at the backend.
# Default: 51200
# Minimum: 64
# Maximum (inclusive): 1048576
AggregationMaxSize = 51200
# Maximum number of items to pack into an PutRecords request.
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
# Default: 500
# Minimum: 1
# Maximum (inclusive): 500
CollectionMaxCount = 500
# Maximum amount of data to send with a PutRecords request.
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
# Records larger than the limit will still be sent, but will not be grouped
# with others.
# Default: 5242880
# Minimum: 52224
# Maximum (inclusive): 9223372036854775807
CollectionMaxSize = 5242880
# Timeout (milliseconds) for establishing TLS connections.
# Default: 6000
# Minimum: 100
# Maximum (inclusive): 300000
ConnectTimeout = 6000
# How often to refresh credentials (in milliseconds).
# During a refresh, credentials are retrieved from any SDK credentials providers attached to
# the wrapper and pushed to the core.
# Default: 5000
# Minimum: 1
# Maximum (inclusive): 300000
# CredentialsRefreshDelay =
# Use a custom CloudWatch endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses. There is no
# way to disable TLS. The KPL always connects with TLS.
# Expected pattern: ^([A-Za-z0-9-\\.]+)?$
CloudwatchEndpoint = ${?KPL_CLOUDWATCH_ENDPOINT}
# Server port to connect to for CloudWatch.
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
CloudwatchPort = ${?KPL_CLOUDWATCH_PORT}
# This has no effect on Windows.
# If set to true, the KPL native process will attempt to raise its own core file size soft
# limit to 128MB, or the hard limit, whichever is lower. If the soft limit is already at or
# above the target amount, it is not changed.
# Note that even if the limit is successfully raised (or already sufficient), it does not
# guarantee that core files will be written on a crash, since that is dependent on operation
# system settings that's beyond the control of individual processes.
# Default: false
#EnableCoreDumps = false
# Use a custom Kinesis endpoint.
# Mostly for testing use. Note this does not accept protocols or paths, only
# host names or ip addresses. There is no way to disable TLS. The KPL always
# connects with TLS.
# Expected pattern: ^([A-Za-z0-9-\\.]+)?$
KinesisEndpoint = ${?KPL_KINESIS_ENDPOINT}
# Server port to connect to. Only useful with KinesisEndpoint.
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
KinesisPort = ${?KPL_KINESIS_PORT}
# If true, throttled puts are not retried. The records that got throttled
# will be failed immediately upon receiving the throttling error. This is
# useful if you want to react immediately to any throttling without waiting
# for the KPL to retry. For example, you can use a different hash key to send
# the throttled record to a backup shard.
# If false, the KPL will automatically retry throttled puts. The KPL performs
# backoff for shards that it has received throttling errors from, and will
# avoid flooding them with retries. Note that records may fail from
# expiration (see record_ttl) if they get delayed for too long because of
# throttling.
# Default: false
FailIfThrottled = false
# Minimum level of logs. Messages below the specified level will not be
# logged. Logs for the native KPL daemon show up on stderr.
# Default: info
# Expected pattern: info|warning|error
LogLevel = info
# Maximum number of connections to open to the backend. HTTP requests are
# sent in parallel over multiple connections.
# Setting this too high may impact latency and consume additional resources
# without increasing throughput.
# Default: 24
# Minimum: 1
# Maximum (inclusive): 256
MaxConnections = 24
# Controls the granularity of metrics that are uploaded to CloudWatch.
# Greater granularity produces more metrics.
# When "shard" is selected, metrics are emitted with the stream name and
# shard id as dimensions. On top of this, the same metric is also emitted
# with only the stream name dimension, and lastly, without the stream name.
# This means for a particular metric, 2 streams with 2 shards (each) will
# produce 7 CloudWatch metrics, one for each shard, one for each stream, and
# one overall, all describing the same statistics, but at different levels of
# granularity.
# When "stream" is selected, per shard metrics are not uploaded; when
# "global" is selected, only the total aggregate for all streams and all
# shards are uploaded.
# Consider reducing the granularity if you're not interested in shard-level
# metrics, or if you have a large number of shards.
# If you only have 1 stream, select "global"; the global data will be
# equivalent to that for the stream.
# Refer to the metrics documentation for details about each metric.
# Default: shard
# Expected pattern: global|stream|shard
MetricsGranularity = shard
# Controls the number of metrics that are uploaded to CloudWatch.
# "none" disables all metrics.
# "summary" enables the following metrics: UserRecordsPut, KinesisRecordsPut,
# ErrorsByCode, AllErrors, BufferingTime.
# "detailed" enables all remaining metrics.
# Refer to the metrics documentation for details about each metric.
# Default: detailed
# Expected pattern: none|summary|detailed
MetricsLevel = detailed
# The namespace to upload metrics under.
# If you have multiple applications running the KPL under the same AWS
# account, you should use a different namespace for each application.
# If you are also using the KCL, you may wish to use the application name you
# have configured for the KCL as the the namespace here. This way both your
# KPL and KCL metrics show up under the same namespace.
# Default: KinesisProducerLibrary
# Expected pattern: (?!AWS/).{1,255}
MetricsNamespace = KinesisProducerLibrary
# Delay (in milliseconds) between each metrics upload.
# For testing only. There is no benefit in setting this lower or higher in
# production.
# Default: 60000
# Minimum: 1
# Maximum (inclusive): 60000
MetricsUploadDelay = 60000
# Minimum number of connections to keep open to the backend.
# There should be no need to increase this in general.
# Default: 1
# Minimum: 1
# Maximum (inclusive): 16
MinConnections = 1
#Path to the native KPL binary. Only use this setting if you want to use a custom build of
#the native code.
# Limits the maximum allowed put rate for a shard, as a percentage of the
# backend limits.
# The rate limit prevents the producer from sending data too fast to a shard.
# Such a limit is useful for reducing bandwidth and CPU cycle wastage from
# sending requests that we know are going to fail from throttling.
# Kinesis enforces limits on both the number of records and number of bytes
# per second. This setting applies to both.
# The default value of 150% is chosen to allow a single producer instance to
# completely saturate the allowance for a shard. This is an aggressive
# setting. If you prefer to reduce throttling errors rather than completely
# saturate the shard, consider reducing this setting.
# Default: 150
# Minimum: 1
# Maximum (inclusive): 9223372036854775807
RateLimit = 150
# Maximum amount of time (milliseconds) a record may spend being buffered
# before it gets sent. Records may be sent sooner than this depending on the
# other buffering limits.
# This setting provides coarse ordering among records - any two records will
# be reordered by no more than twice this amount (assuming no failures and
# retries and equal network latency).
# The library makes a best effort to enforce this time, but cannot guarantee
# that it will be precisely met. In general, if the CPU is not overloaded,
# the library will meet this deadline to within 10ms.
# Failures and retries can additionally increase the amount of time records
# spend in the KPL. If your application cannot tolerate late records, use the
# record_ttl setting to drop records that do not get transmitted in time.
# Setting this too low can negatively impact throughput.
# Default: 100
# Maximum (inclusive): 9223372036854775807
RecordMaxBufferedTime = 100
# Set a time-to-live on records (milliseconds). Records that do not get
# successfully put within the limit are failed.
# This setting is useful if your application cannot or does not wish to
# tolerate late records. Records will still incur network latency after they
# leave the KPL, so take that into consideration when choosing a value for
# this setting.
# If you do not wish to lose records and prefer to retry indefinitely, set
# record_ttl to a large value like INT_MAX. This has the potential to cause
# head-of-line blocking if network issues or throttling occur. You can
# respond to such situations by using the metrics reporting functions of the
# KPL. You may also set fail_if_throttled to true to prevent automatic
# retries in case of throttling.
# Default: 30000
# Minimum: 100
# Maximum (inclusive): 9223372036854775807
RecordTtl = 30000
# Which region to send records to.
# If you do not specify the region and are running in EC2, the library will
# use the region the instance is in.
# The region is also used to sign requests.
# Expected pattern: ^([a-z]+-[a-z]+-[0-9])?$
Region = ${?AWS_REGION}
# The maximum total time (milliseconds) elapsed between when we begin a HTTP
# request and receiving all of the response. If it goes over, the request
# will be timed-out.
# Note that a timed-out request may actually succeed at the backend. Retrying
# then leads to duplicates. Setting the timeout too low will therefore
# increase the probability of duplicates.
# Default: 6000
# Minimum: 100
# Maximum (inclusive): 600000
RequestTimeout = 6000
# Temp directory into which to extract the native binaries. The KPL requires write
# permissions in this directory.
# If not specified, defaults to /tmp in Unix. (Windows TBD)
# TempDirectory
# Verify the endpoint's certificate. Do not disable unless using
# custom_endpoint for testing. Never disable this in production.
# Default: true
VerifyCertificate = ${?KPL_VERIFY_CERTIFICATE}
# Sets the threading model that the native process will use.
# Enum:
# ThreadingModel.PER_REQUEST: Tells the native process to create a thread for each request.
# ThreadingModel.POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize
# Default = ThreadingModel.PER_REQUEST
# ThreadingModel =
# Sets the maximum number of threads that the native process' thread pool will be configured with.
# Default: 0
# ThreadPoolSize =
# The default consumer configuration, used for all consumers.
default-consumer {
# The name of the consumer stream, MUST be specified per consumer
# stream-name = "kinesis-stream"
akka {
# Fully qualified config path for the consumer's dispatcher
# Can be overridden per producer/consumer
# Overriding this value with null or an empty string will result is no specific dispatcher being specified
dispatcher = "kinesis.akka.default-dispatcher"
worker {
# This is the total timeout for processing a single batch
# All messages must have been processed and checkpointed for the batch to be considered complete.
# Should be tuned in combination with `maxRecords` which controls the size of the batch.
# Note that this will be per retry.
batchTimeoutSeconds = 10 # Given 10,000 records this requires at least 1000/sec
# The number of times to retry failed batch messages.
# A failed message is one where we received no response from the eventProcessor within the above timeout.
# A value of `1` means we retry each failed message once (thus we attempt processing failed messages twice).
failedMessageRetries = 1
# The failure tolerance percentage for each batch (rounded down).
# So for a batch of 500 messages, the default (0.25) allows 1 unprocessed message.
# If after retrying unprocessed messages we are within this tolerance then we will ccheckpoint the latest and continue.
# If we are above this tolerance then we will shutdown processing for the specific shard on this node.
# The eventProcessor will be notified of any shutdown by sending a KinesisProcessorFailure message.
failureTolerancePercentage = 0.25
# Automatic graceful shutdown - this uses a jvm shutdown hook to automatically stop the consumer, forcing a checkpoint
# The shutdown will block for up to the period defined in shutdownTimeoutSeconds
gracefulShutdownHook = true
# When gracefully shutting down, this is the timeout allowed checkpointing etc
# It is strongly recommended that this is larger than the batch timeout * retries
# This will help prevent message duplication due to unfinished batches on shutdown
shutdownTimeoutSeconds = 25
checkpointer {
# The amount of time to wait after failing to checkpoint, usually due to an exception or dynamo throttling.
backoffMillis = 3000
# The standard delay between (successful) checkpoints
intervalMillis = 2000
# The delay between notification messages sent to the parent to indicate we're ready to checkpoint
# This is for subsequent messages after the initial notification,
# for cases where the parent had no messages to checkpoint.
notificationDelayMillis = 1000
# We convert these to a properties file to be fed directly into the Kinesis KCL library
# See the link below for full details
# This section can be overridden on a per consumer basis
# Most Config values within this kpl block are optional (see defaults)
kcl {
# See here for credentials providers:
# Providing a list of these providers creates a `AWSCredentialsProviderChain` where each one is tried until one succeeds
# Mandatory
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# if workerId is not provided in the properties. WorkerId is automatically generated
#workerId = 123
# The location in the shard from which the KinesisClientLibrary will start fetching records from
# when the application starts for the first time and there is no checkpoint for the shard.
# Once checkpointing has begun, the application will always continue from the last checkpoint (in DynamoDB)
# See:
# Default: LATEST
initialPositionInStream = TRIM_HORIZON
# Max records to fetch from Kinesis in a single GetRecords call.
# Default = 10000
#maxRecords = 10000
# Idle time between record reads in milliseconds
# Default = 1000
#idleTimeBetweenReadsInMillis = 1000
# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases.
# Default: 10000
#failoverTimeMillis = 10000
# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
# Default: 60000
#shardSyncIntervalMillis = 60000
# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
# Default: true
#cleanupLeasesUponShardCompletion = true
# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
# Default: true
#validateSequenceNumberBeforeCheckpointing = true
# An alternative to kinesisEndpoint, sets the
# regional endpoint for this client's service calls. Use this
# method to control which AWS region you want to work with.
# Default: null
regionName = ${?AWS_REGION}
# Overrides the default endpoint for this client. e.g. ("").
# This can be used to control which AWS region they want to work with and
# will override the regionName property
# Callers can pass in just the endpoint (e.g. "")
# or a full URL, including the protocol (e.g. "").
# If the protocol is not specified here, the default protocol will be used, which is HTTPS.
# For more information on using AWS regions with the AWS SDK for Java, and
# a complete list of all available endpoints for all AWS services see:
# Default: null
kinesisEndpoint = ${?KCL_KINESIS_ENDPOINT}
# DynamoDB endpoint
# Default: null
# Don't call processRecords() on the record processor for empty record lists.
# Enables applications flush/checkpoint (if they have some data "in progress but don't get new data for while)
# Default: false
#callProcessRecordsEvenForEmptyRecordList = false
# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
# Default: 10000
#parentShardPollIntervalMillis = 10000
# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
# Default: 500
#taskBackoffTimeMillis = 500
# Buffer metrics for at most this long before publishing to CloudWatch.
# Default: 10000
#metricsBufferTimeMillis = 10000
# Buffer at most this many metrics before publishing to CloudWatch.
# Default: 10000
#metricsMaxQueueSize = 10000
# Metrics level for which to enable CloudWatch metrics.
# Default: DETAILED
#metricsLevel = DETAILED
# Allowed dimensions for CloudWatch metrics.
# Operation dimension will be enabled regardless of the config provided.
# See:
# Comma separated list of values from:
# Operation, ShardId, WorkerIdentifier, ALL
# Default: Operation, ShardId
#metricsEnabledDimensions = Operation, ShardId
# The max number of leases (shards) this worker should process.
# This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
# or during deployment.
# NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
# stream due to the max limit.
# Default: 2147483647 (Integer.MAX_VALUE)
#maxLeasesForWorker = 2147483647
# Max leases to steal from another worker at one time (for load balancing).
# Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
# but can cause higher churn in the system.
# Default: 1
#maxLeasesToStealAtOneTime = 1
# The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
# Default: 10
#initialLeaseTableReadCapacity = 10
# The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
# Default: 10
#initialLeaseTableWriteCapacity = 10
# If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases
# in the lease table. This assumes that the shards and leases are in-sync.
# This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
# skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker initialization).
# Default: false
# Override the default user agent (application name)
# Default: <applicationName>
#userAgent =
# TableName name of the lease table in DynamoDB
# Default = <applicationName>
#TableName =
