Skip to content

Instantly share code, notes, and snippets.

@stephnr
Created February 2, 2018 15:11
Show Gist options
  • Save stephnr/ca9b44bc5cadae8bb6e6b422d5ec2670 to your computer and use it in GitHub Desktop.
Save stephnr/ca9b44bc5cadae8bb6e6b422d5ec2670 to your computer and use it in GitHub Desktop.
Tail a kinesis stream for decrypted records
#!/bin/bash
red=$'\e[1;31m'
green=$'\e[1;32m'
yellow=$'\e[1;33m'
blue=$'\e[1;34m'
magenta=$'\e[1;35m'
cyan=$'\e[1;36m'
dim=$'\e[2m'
end=$'\e[0m'
# Detect if jq is installed
hash jq 2>/dev/null || { printf "${yellow}jq${end} command is required but is not installed.\n\n${dim}Please download from here >>${end} ${cyan}https://stedolan.github.io/jq/download/${end}\n"; exit 1; }
if [ "$1" == "help" ] || [ $# -eq 0 ]; then
printf "Usage: ${dim}tailKinesisRecords --stream-name=my-stream [options]${end}\n\n"
printf "tails a kinesis stream and prints the data contents to stdout\n\n"
printf "${cyan}Flags:${end}\n\n"
printf "%-30s%s\n" "-s STREAM_NAME" "${dim}the name of the kinesis stream${end}"
printf "%-30s%s\n" "-i IDX" "${dim}the index of the shard to use. [Default: 0]${end}"
printf "%-30s%s\n" "-t <TRIM_HORIZON|LATEST>" "${dim}the type of kinesis iterator. Defaults to LATEST${end}"
printf "%-30s%s\n" "-n COUNT" "${dim}how many records to be returned from each call to kinesis. [Default: 1]${end}"
printf "%-30s%s\n" "-h URL" "${dim}host url of the kinesis stream. Defaults to AWS${end}"
printf "%-30s%s\n" "-p AWS_PROFILE_NAME" "${dim}name of the AWS profile to use. Defaults to value of AWS_DEFAULT_PROFILE variable${end}"
exit 0
fi
# Defaults
EndpointUrl=""
Limit=1
Profile=$AWS_DEFAULT_PROFILE
ShardId=0
ShardIteratorType="LATEST"
MY_OPTS=":s:i:t:n:h:p:"
while getopts ${MY_OPTS} opt; do
case $opt in
s) StreamName=$OPTARG ;;
i)
if [[ $OPTARG =~ ^[[:digit:]]+$ ]]; then
ShardId=$OPTARG
fi
;;
t) ShardIteratorType=$OPTARG ;;
n) Limit=$OPTARG ;;
h) EndpointUrl=$OPTARG ;;
p) Profile=$OPTARG ;;
\?)
echo "Invalid option: -$OPTARG" >&2
exit 1
;;
esac
done
if [ -z "$StreamName" ]; then
printf "\nNo stream name was provided. Aborting.\n"
exit 1
fi
if [ "$Profile" != "" ]; then
Profile="$Profile"
fi
if [ "$EndpointUrl" != "" ]; then
EndpointUrl="--endpoint-url $EndpointUrl --no-verify-ssl"
fi
StreamDescription=$(aws kinesis describe-stream --stream-name $StreamName $Profile $EndpointUrl | jq -r ".StreamDescription")
Status=$(echo $StreamDescription | jq -r ".StreamStatus")
if [ "${Status}" != "ACTIVE" ]; then
echo "Stream is not active!"
exit 1
fi
Shard=$(echo $StreamDescription | jq -r ".Shards[$ShardId]")
ShardId=$(echo $Shard | jq -r ".ShardId")
Iterator=$(aws kinesis get-shard-iterator --stream-name $StreamName --shard-id $ShardId --shard-iterator-type $ShardIteratorType $EndpointUrl | jq -r ".ShardIterator")
printf "Beginning to tail kinesis stream : [%s] at %s ...\n" $StreamName $ShardId
while : ; do
response=$(aws kinesis get-records --shard-iterator $Iterator --limit $Limit $Profile $EndpointUrl)
records=$(echo $response | jq -r ".Records")
if [ ${#records} -ge 3 ]; then
echo "Milliseconds Behind: $(echo $response | jq -r ".MillisBehindLatest")\n"
echo $(echo $records | jq -r ".[] | .Data" | base64 -D)
fi
Iterator=$(echo $response | jq -r ".NextShardIterator")
sleep 2
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment