Skip to content

Instantly share code, notes, and snippets.

@dvannoy
Last active November 10, 2021 17:44
Show Gist options
  • Save dvannoy/48b8b082974fbfcb304bab3bca1623be to your computer and use it in GitHub Desktop.
Save dvannoy/48b8b082974fbfcb304bab3bca1623be to your computer and use it in GitHub Desktop.
Notes and examples for ksqlDB

ksqlDB Overview

Table

https://docs.ksqldb.io/en/latest/concepts/tables/ "A table is a mutable, partitioned collection that models change over time...represents what is true as of 'now'"

A table can have a primary key that handles updates and deletes based on that primary key which makes them very useful for accurate aggregations when data may change over time.

Stream

https://docs.ksqldb.io/en/latest/concepts/streams/ "A stream is a partitioned, immutable, append-only collection that represents a series of historical facts." In other words, this is a Kafka topic but with a schema specific to KSQL added on top of it. You can define a stream on an existing Kafka topic (similar to external table in Hive or Spark SQL). You often use KSQL to create new streams which are consumable as Kafka topics.

"Both KEY and VALUE columns can be NULL. No special processing is done if two rows have the same key. This situation is handled differently by ksqlDB TABLEs, as shown in the following table."

Deletes - NULL/Tombstones

Deletes (Tombstone records) are records with a valid key and a null value. Tables respect these as deletes, Streams just treat it as a null value.. "An important difference between tables and streams is that a record with a non-null key and a null value has a special semantic meaning: in a table, this kind of record is a tombstone, which tells KSQL to “DELETE this key from the table”. For a stream, null is a value like any other, with no special meaning." https://docs.confluent.io/5.2.0/ksql/docs/developer-guide/aggregate-streaming-data.html#tombstone-records

I'm not sure an aggregate will pick this up if the delete key is not in the group by. Here is the SO post about Group by on Tombstone values: https://stackoverflow.com/questions/57882216/performing-group-by-on-tombstone-values/57892565

Queries

https://docs.ksqldb.io/en/latest/concepts/queries/ Persistent - continuously running and emiting emiting events, used by new streams and new tables Push - subscribe to results of a query (consume), results returned but not persisted to Kafka Pull - get results of a query as of now, returns immediately with a finite result

Aggregations

Similar to normal SQL, extra function latest_by_offset() is helpful to get latest instead of max. GROUP BY fields will end up defining the key and parititoning within the output topic.

9 minute video from Confluent: https://www.youtube.com/embed/db5SsmNvej4

Windows

https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/#windows-in-sql-queries "KSQL tracks windows per record key." Use WINDOWSTART and WINDOWEND to access window boundaries, very helpful for hopping windows.

For Tumbling and Hopping windows, a one minute window will start over at the top of each minute (and hourly starts at top of each hour). This means data arriving at 23:01:59 and at 23:02:01 will be in different windows.

RETENTION - "amount of time ksqlDB stores windowed results"

GRACE PERIOD - "the length of time each window continues to accept and process late-arriving events after the end of the window" Note: This didn't work the way it sounds in my tests, it still counted data in some window despite it arriving later than what I calculated the grace period ending to be. I will update if I get a good test scenario to show where it kicks in and where it does not.

Tumbling Window

Tumbing window is the most likely choice for analytics. A tumbling window will output latest calculation within the window when an update happens. For example: 5 minute window

  • Input
    • Record1: 100, KafkaTimestamp: 2021-05-12 00:00
    • Record2: 50, KafkaTimestamp: 2021-05-12 00:02
    • Record3: 25, KafkaTimestamp: 2021-05-12 00:06
  • Sum Output
    • 100
    • 150
    • 25

You will likely include a WINDOWSTART and/or WINDOWEND time to make it easy to see which window the record represents. If trying to represent the last day, this would allow the consumer to expire the record once WINDOWSTART is older than what is desired.

Example query: SELECT user, count(1) as cnt, sum(durationSeconds) as totalDurationSeconds, latest_by_offset(eventTimestamp) as latestTimestamp, max(ROWTIME) as mxRowtime, TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS START_WINDOW, TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS END_WINDOW FROM USAGE_TEST WINDOW TUMBLING (SIZE 1 Day, RETENTION 30 DAYS, GRACE PERIOD 7 DAYS) GROUP BY user EMIT CHANGES;

Hopping Window

It's less likely that you will want a hopping window since the windows overlap, meaning your data will show up in multiple windows. This puts more burden on the consumer of the results to display or analyze the correct window. This is a good example of where a hopping window would make sense (note the use of WINDOWSTART to distinguish windows). https://kafka-tutorials.confluent.io/create-hopping-windows/ksql.html

Session Window

Window created per key based on a "timeout" interval. Once a record is received the window starts and lasts until a gap in activity the length of the window. For example, for a 5 minute window all activity is grouped together until 5 minutes passes without activity. In the case of late arriving records, the session windows affected are recalculated.

Joins

https://docs.ksqldb.io/en/latest/developer-guide/joins/join-streams-and-tables/

Other links

https://www.confluent.io/blog/how-real-time-stream-processing-works-with-ksqldb/https://docs.ksqldb.io/en/latest/reference/sql/data-types/

https://docs.ksqldb.io/en/latest/reference/sql/data-types/

https://docs.ksqldb.io/en/latest/tutorials/materialized/

https://docs.ksqldb.io/en/latest/tutorials/etl/

Operations and Scaling

"To scale ksqlDB horizontally, run additional ksqlDB servers with the same ksql.service.id and ksql.streams.bootstrap.servers settings." https://docs.ksqldb.io/en/latest/operate-and-deploy/capacity-planning/#scaling-ksqldb

Have option to run headless (non-interactive) which could be good for a deployed KSQL application where no one will have access to connect KSQL CLI anyway. https://docs.ksqldb.io/en/latest/operate-and-deploy/capacity-planning/#interactive-ksqldb-servers-vs-non-interactive-headless-servers

Stack Overflow helpful links: Scale KTable to billions of records - https://stackoverflow.com/questions/63166093/ktable-with-billions-unique-keys?rq=1

KTable requires state store - https://stackoverflow.com/questions/56943048/cost-of-kstream-vs-cost-of-ktable-with-respect-to-the-state-store?rq=1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment