Skip to content

Instantly share code, notes, and snippets.

@gregplaysguitar
Last active July 28, 2021 05:06
Show Gist options
  • Save gregplaysguitar/228cedf46b6cd3690eb0a24595f685f9 to your computer and use it in GitHub Desktop.
Save gregplaysguitar/228cedf46b6cd3690eb0a24595f685f9 to your computer and use it in GitHub Desktop.
DynamoDB data streaming with localstack

DynamoDB data streaming with localstack

This repo demonstrates how to set up change data capture using either DynamoDB Streams, or Kinesis Data Streams for a DynamoDB table running in localstack

Requires Docker

To run the examples, first start localstack

docker compose up

Test the DynamoDB Stream

./test_dynamodb_stream.sh

Test the Kinesis Data Stream

./test_kinesis_stream.sh

In each case you should see details of the data changes logged to the localstack console

version: "3.7"
services:
localstack:
container_name: "${LOCALSTACK_DOCKER_NAME-localstack}"
image: localstack/localstack
hostname: localstack
ports:
- "4566:4566"
environment:
- SERVICES=dynamodb,lambda,kinesis
- DEBUG=1
- DATA_DIR=/tmp/localstack/data
- LAMBDA_EXECUTOR=docker-reuse
- LAMBDA_REMOTE_DOCKER=true
- LAMBDA_REMOVE_CONTAINERS=true
# - KINESIS_ERROR_PROBABILITY=${KINESIS_ERROR_PROBABILITY-0.1}
- DOCKER_HOST=unix:///var/run/docker.sock
- HOST_TMP_FOLDER=${TMPDIR}
volumes:
# - ./data:/tmp/localstack
- "/var/run/docker.sock:/var/run/docker.sock"
"use strict";
exports.handler = (event, context) => {
console.log("Executing dynamodb stream handler");
console.log(JSON.stringify({ event, context }, null, 2));
return true;
};
"use strict";
exports.handler = (event, context) => {
console.log("Executing kinesis data stream handler");
console.log(JSON.stringify({ event, context }, null, 2));
if (event.Records) {
// handle kinesis data
for (const record of event.Records) {
const data = JSON.parse(
Buffer.from(record.kinesis.data, "base64").toString("ascii")
);
console.log("Decoded:", JSON.stringify({ data }, null, 2));
}
}
return true;
};
#!/bin/bash
set -euo pipefail
RESOURCE_NAME="test-$(openssl rand -hex 4)"
echo "Preparing and uploading function $RESOURCE_NAME from index.js"
zip dynamodb.zip dynamodb.js
aws --endpoint-url=http://localhost:4566 lambda create-function \
--function-name "$RESOURCE_NAME" \
--zip-file fileb://dynamodb.zip \
--handler dynamodb.handler \
--environment "Variables={PRIMARY_ELASTIC_HOST=127.0.0.1:4571,SECONDARY_ELASTIC_HOST=127.0.0.1:4571,ENV=LOCAL}" \
--runtime 'nodejs12.x' \
--role arn:aws:iam::000000000000:role/lambda-dotnet-ex
echo "Invoking function $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 lambda invoke --function-name "$RESOURCE_NAME" /dev/stdout
echo "Creating dynamodb table $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 dynamodb create-table \
--table-name "$RESOURCE_NAME" \
--attribute-definitions 'AttributeName=a1,AttributeType=S' \
--key-schema 'AttributeName=a1,KeyType=HASH' \
--provisioned-throughput 'ReadCapacityUnits=1,WriteCapacityUnits=1' \
--stream-specification 'StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES'
echo "Retrieving ARN for dyanamodb stream"
STREAM_ARN=$(aws --endpoint-url=http://localhost:4566 dynamodbstreams list-streams | jq -r ".Streams[] | select(.TableName == \"$RESOURCE_NAME\") | .StreamArn")
echo "Creating event-source mapping for stream $STREAM_ARN"
aws --endpoint-url=http://localhost:4566 lambda create-event-source-mapping \
--function-name "$RESOURCE_NAME" \
--event-source "$STREAM_ARN" \
--batch-size 10 \
--starting-position TRIM_HORIZON
echo "Putting new record to table $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 dynamodb put-item --table-name "$RESOURCE_NAME" --item '{"a1":{"S":"123"}}'
echo "Putting existing record to table $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 dynamodb put-item --table-name "$RESOURCE_NAME" --item '{"a1":{"S":"123"},"a2":{"S":"456"}}'
echo "Updating attribute on existing record in table $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 dynamodb update-item \
--table-name "$RESOURCE_NAME" \
--key '{"a1":{"S":"123"}}' \
--update-expression 'SET #H = :h' \
--expression-attribute-names '{"#H":"a2"}' \
--expression-attribute-values '{":h":{"S":"Hello world"}}'
#!/bin/bash
set -euo pipefail
RESOURCE_NAME="test-$(openssl rand -hex 4)"
export RESOURCE_NAME
echo "Preparing and uploading function $RESOURCE_NAME from index.js"
zip kinesis.zip kinesis.js
aws --endpoint-url=http://localhost:4566 lambda create-function \
--function-name "$RESOURCE_NAME" \
--zip-file fileb://kinesis.zip \
--handler kinesis.handler \
--environment "Variables={PRIMARY_ELASTIC_HOST=127.0.0.1:4571,SECONDARY_ELASTIC_HOST=127.0.0.1:4571,ENV=LOCAL}" \
--runtime 'nodejs12.x' \
--role arn:aws:iam::000000000000:role/lambda-dotnet-ex
echo "Invoking function $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 lambda invoke --function-name "$RESOURCE_NAME" /dev/stdout
echo "Creating dynamodb table $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 dynamodb create-table \
--table-name "$RESOURCE_NAME" \
--attribute-definitions 'AttributeName=a1,AttributeType=S' \
--key-schema 'AttributeName=a1,KeyType=HASH' \
--provisioned-throughput 'ReadCapacityUnits=1,WriteCapacityUnits=1'
echo "Creating kinesis stream $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name "$RESOURCE_NAME" \
--shard-count 3
STREAM_ARN=$(aws --endpoint-url=http://localhost:4566 kinesis describe-stream \
--stream-name "$RESOURCE_NAME" |
jq -r ".StreamDescription.StreamARN")
echo "Connecting table $RESOURCE_NAME to kinesis stream $STREAM_ARN"
aws --endpoint-url=http://localhost:4566 dynamodb enable-kinesis-streaming-destination \
--table-name "$RESOURCE_NAME" \
--stream-arn "$STREAM_ARN"
echo "Creating event-source mapping for stream $STREAM_ARN"
aws --endpoint-url=http://localhost:4566 lambda create-event-source-mapping \
--function-name "$RESOURCE_NAME" \
--event-source "$STREAM_ARN" \
--batch-size 10 \
--starting-position TRIM_HORIZON
echo "Putting new record to table $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 dynamodb put-item --table-name "$RESOURCE_NAME" --item '{"a1":{"S":"123"}}'
echo "Putting existing record to table $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 dynamodb put-item --table-name "$RESOURCE_NAME" --item '{"a1":{"S":"123"},"a2":{"S":"456"}}'
echo "Updating attribute on existing record in table $RESOURCE_NAME"
aws --endpoint-url=http://localhost:4566 dynamodb update-item \
--table-name "$RESOURCE_NAME" \
--key '{"a1":{"S":"123"}}' \
--update-expression 'SET #H = :h' \
--expression-attribute-names '{"#H":"a2"}' \
--expression-attribute-values '{":h":{"S":"Hello world"}}'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment