Skip to content

Instantly share code, notes, and snippets.

@pnc
Last active September 1, 2022 01:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pnc/31e7eb90f184c37b5305463392e5b2db to your computer and use it in GitHub Desktop.
Save pnc/31e7eb90f184c37b5305463392e5b2db to your computer and use it in GitHub Desktop.
Amazon Kinesis Firehose Survival Guide

Kinesis Firehose Survival Guide

Avoid regular Kinesis if you can

It's every bit as complicated to build software against as Apache Kafka and reasoning about repartioning, retry, and failover is a huge pain and only matters if you have TB/s of data. Kinesis Firehose will happily push many, many MB/s or maybe even GB/s without the complexity.

Kinesis Firehose is pay-by-the-drink (price per byte) and pretty darn cheap. Real Kinesis (price per hour) gets expensive fast, since it involves provisioned infrastructure. Ditto Kinesis Data Analytics. One more reason not to use it.

If you find yourself writing a Kinesis Firehose consumer, run screaming into the night. Here is the first half of the steps to connect to Kinesis and consume data:

The KCL acts as an intermediary between your record processing logic and Kinesis Data Streams. The KCL performs the following tasks:

  • Connects to the data stream
  • Enumerates the shards within the data stream
  • Uses leases to coordinates shard associations with its workers
  • Instantiates a record processor for every shard it manages
  • Pulls data records from the data stream

Do you want to do that, plus the other half of the steps? You sure don't!

Getting data into Firehose is incredibly easy. Ideally you will do a little batching client-side to increase the amount of data sent per request, which reduces your per-request fees a little.

Define your queries first

Kinesis Firehose ultimately wants to dump data into something, probably S3. The best way to query data in S3 is Athena, which is just Presto. So if you are sending this into Firehose:

{"user": 123, "speed": 22.2}
{"user": 123, "speed": 22.4}
{"user": 666, "speed": 0}
{"user": 123, "speed": 15.9}

then Kinesis Firehose is putting these in S3 (by default) in split-by-size-and-time chunks:

s3://yourbucket/someprefix/2022-08-31/QBKLD303923022933-shard1.json.gz
s3://yourbucket/someprefix/2022-08-31/QBKLD303923022933-shard2.json.gz

and you want "max speed of each user" that's spelled

CREATE EXTERNAL TABLE user_speeds ( user integer, speed double ) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://yourbucket/someprefix/'

and then

SELECT user, MAX(speed) FROM user_speeds GROUP BY user

in Athena and it will cost you a few cents to run that. If you need to roll these up, you can set up a Lambda (or a Data Pipeline or Step Function if you don't respect yourself or your own time) to run e.g.

INSERT INTO user_speeds_rollup(dt, user, speed)
SELECT dt, user, MAX(speed)
FROM user_speeds
WHERE dt = '2022-08-31'
GROUP BY user

and then querying that rollup table is even cheaper. But you might not need it depending on how often you query.

Plan to partition

By default, that simple query above will query all your data, which will get bigger and bigger. You want to partition your data by day, day+hour, customer+day+hour, or whatever makes sense for your query patterns.

Read both of the documents to understand the very powerful (but kind of confusing) partitioning the Firehose supports:

https://aws.amazon.com/blogs/big-data/kinesis-data-firehose-now-supports-dynamic-partitioning-to-amazon-s3/

https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

It's not obvious, but since Athena is really just Apache Presto, and since Apache Presto leans heavily on Hive, for it to "understand" your partitions in S3, they must be named using the keyname=value (e.g. myprefix/dt=2022-08-31/, full stop. Make sure you get this right in your application/Firehose set up or it's a pain in the neck to change later.

You can send tons of different formats to Firehose, but I recommend newline-delimited JSON because it's boring and it works and Athena can query it all day long.

If query performance/price becomes an issue, you can always convert your incoming JSON records into Apache Parquet, which is cheaper and faster to query from Athena.

Untrusted clients/clients that can't run the AWS SDK

If you need to send data to Kinesis Firehose but don't want to issue temporary AWS STS credentials, you don't trust your clients, or your clients can't run the AWS SDK, you can either:

  1. Make a Lambda that authenticates the client/formats the data/submits it to Firehose. Expose this Lambda using API Gateway in proxy mode (do NOT use the awful HTTP mode unless you hate yourself.) See https://dev.to/aws-builders/4-ways-of-executing-lambda-function-via-http-endpoint-a-comparison-560b for a comparison of "ways to turn HTTP into Lambda" but you can also just believe me and do https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html. (You can use AWS SAM CLI to reduce the amount of clicking you have to do, or you can try the AWS CDK if you want to waste a lot of time debugging new, under-tested new software.)
  2. If your clients are anonymous or you don't need authentication, make an S3 bucket with nothing in it, enable the website endpoint, enable access logging (to another bucket), configure your clients to hit URLs like https://yourbucket.s3-website-us-east-1.com/?data={"your":"json data"} and then query those access logs from Athena just like you would query data from Firehose. You can use Presto's parse_json function to parse out the actual data you care about. See Amazon's docs for setting up an Athena table that queries S3 bucket access logs.

Other stuff

  • DO NOT forget to include a timestamp of some kind in your input data. Firehose kind of adds one automatically, but it's not useful and doesn't appear in the S3 data.
  • If you do use newline-delimited JSON, you can peek at records in S3 with e.g.:
     aws s3 cp s3://yourbucket/someprefix/2022-08-31/QBKLD303923022933-shard2.json.gz - | zcat | jq . | less
    
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment